You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ni...@apache.org on 2022/09/29 12:56:07 UTC

[rocketmq-flink] 01/01: Revert "Fix some problems with the catalog (#58)"

This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch revert-58-feature_catalog_fix
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git

commit 9591c0c67fe092f158f482add411f7be2c417f6d
Author: Nicholas Jiang <pr...@163.com>
AuthorDate: Thu Sep 29 20:56:02 2022 +0800

    Revert "Fix some problems with the catalog (#58)"
    
    This reverts commit 6909cbc5d2c3c6a0590d5a7bd32bd9f45cf18f97.
---
 .../org/apache/rocketmq/flink/catalog/RocketMQCatalogFactory.java   | 2 --
 .../rocketmq/flink/catalog/RocketMQCatalogFactoryOptions.java       | 2 +-
 .../apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java  | 6 ++----
 3 files changed, 3 insertions(+), 7 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactory.java b/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactory.java
index 609708f..37d0b3c 100644
--- a/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactory.java
+++ b/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactory.java
@@ -61,8 +61,6 @@ public class RocketMQCatalogFactory implements CatalogFactory {
     public Set<ConfigOption<?>> optionalOptions() {
         Set<ConfigOption<?>> options = new HashSet<>();
         options.add(DEFAULT_DATABASE);
-        options.add(NAME_SERVER_ADDR);
-        options.add(SCHEMA_REGISTRY_BASE_URL);
         return options;
     }
 }
diff --git a/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactoryOptions.java b/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactoryOptions.java
index 624539b..25226b2 100644
--- a/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactoryOptions.java
+++ b/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactoryOptions.java
@@ -30,7 +30,7 @@ import org.apache.flink.table.catalog.CommonCatalogOptions;
 @Internal
 public final class RocketMQCatalogFactoryOptions {
 
-    public static final String IDENTIFIER = "rocketmq_catalog";
+    public static final String IDENTIFIER = "rocketmq-catalog";
 
     public static final ConfigOption<String> DEFAULT_DATABASE =
             ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY)
diff --git a/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java
index 366991d..f61cbda 100644
--- a/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java
+++ b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java
@@ -242,10 +242,8 @@ public class RocketMQDynamicTableSink implements DynamicTableSink, SupportsWriti
         Properties producerProps = new Properties();
         producerProps.setProperty(RocketMQConfig.PRODUCER_GROUP, producerGroup);
         producerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameServerAddress);
-        if (accessKey != null && secretKey != null) {
-            producerProps.setProperty(RocketMQConfig.ACCESS_KEY, accessKey);
-            producerProps.setProperty(RocketMQConfig.SECRET_KEY, secretKey);
-        }
+        producerProps.setProperty(RocketMQConfig.ACCESS_KEY, accessKey);
+        producerProps.setProperty(RocketMQConfig.SECRET_KEY, secretKey);
         return producerProps;
     }