You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ni...@apache.org on 2022/07/26 09:54:43 UTC

[rocketmq-flink] branch main updated: [issues-44] support access control

This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git


The following commit(s) were added to refs/heads/main by this push:
     new 034c4f2  [issues-44] support access control
     new 87fb9c2  Merge pull request #45 from lizhimins/support-acl
034c4f2 is described below

commit 034c4f2b7108ab9f76ebcd127078bdbb8cc26aa4
Author: 斜阳 <te...@alibaba-inc.com>
AuthorDate: Sun Jul 24 16:28:39 2022 +0800

    [issues-44] support access control
---
 .../rocketmq/flink/common/RocketMQOptions.java     |  7 ++++
 .../flink/sink/table/RocketMQDynamicTableSink.java | 48 ++++++++++++++++++++++
 .../table/RocketMQDynamicTableSinkFactory.java     |  9 ++++
 .../rocketmq/flink/source/RocketMQSource.java      | 43 +++++++++++++++++++
 .../enumerator/RocketMQSourceEnumerator.java       | 26 +++++++++++-
 .../reader/RocketMQPartitionSplitReader.java       | 23 +++++++++--
 .../RocketMQDeserializationSchema.java             |  2 +
 .../table/RocketMQDynamicTableSourceFactory.java   |  4 ++
 .../source/table/RocketMQScanTableSource.java      | 40 ++++++++++++++++++
 9 files changed, 198 insertions(+), 4 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java b/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java
index d14b063..dee3b96 100644
--- a/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java
+++ b/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java
@@ -83,6 +83,7 @@ public class RocketMQOptions {
 
     public static final ConfigOption<Long> OPTIONAL_WRITE_SLEEP_TIME_MS =
             ConfigOptions.key("sleepTimeMs").longType().defaultValue(5000L);
+
     public static final ConfigOption<Boolean> OPTIONAL_WRITE_IS_DYNAMIC_TAG =
             ConfigOptions.key("isDynamicTag").booleanType().defaultValue(false);
 
@@ -97,4 +98,10 @@ public class RocketMQOptions {
 
     public static final ConfigOption<Boolean> OPTIONAL_WRITE_KEYS_TO_BODY =
             ConfigOptions.key("writeKeysToBody").booleanType().defaultValue(false);
+
+    public static final ConfigOption<String> OPTIONAL_ACCESS_KEY =
+            ConfigOptions.key("accessKey").stringType().noDefaultValue();
+
+    public static final ConfigOption<String> OPTIONAL_SECRET_KEY =
+            ConfigOptions.key("secretKey").stringType().noDefaultValue();
 }
diff --git a/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java
index cd6261c..816449f 100644
--- a/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java
+++ b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java
@@ -57,6 +57,9 @@ public class RocketMQDynamicTableSink implements DynamicTableSink, SupportsWriti
     private final String fieldDelimiter;
     private final String encoding;
 
+    private final String accessKey;
+    private final String secretKey;
+
     private final long retryTimes;
     private final long sleepTime;
 
@@ -84,11 +87,52 @@ public class RocketMQDynamicTableSink implements DynamicTableSink, SupportsWriti
             boolean isDynamicTagIncluded,
             boolean writeKeysToBody,
             String[] keyColumns) {
+
+        this(
+                properties,
+                schema,
+                topic,
+                null,
+                null,
+                producerGroup,
+                nameServerAddress,
+                tag,
+                dynamicColumn,
+                fieldDelimiter,
+                encoding,
+                retryTimes,
+                sleepTime,
+                isDynamicTag,
+                isDynamicTagIncluded,
+                writeKeysToBody,
+                keyColumns);
+    }
+
+    public RocketMQDynamicTableSink(
+            DescriptorProperties properties,
+            TableSchema schema,
+            String topic,
+            String producerGroup,
+            String nameServerAddress,
+            String accessKey,
+            String secretKey,
+            String tag,
+            String dynamicColumn,
+            String fieldDelimiter,
+            String encoding,
+            long retryTimes,
+            long sleepTime,
+            boolean isDynamicTag,
+            boolean isDynamicTagIncluded,
+            boolean writeKeysToBody,
+            String[] keyColumns) {
         this.properties = properties;
         this.schema = schema;
         this.topic = topic;
         this.producerGroup = producerGroup;
         this.nameServerAddress = nameServerAddress;
+        this.accessKey = accessKey;
+        this.secretKey = secretKey;
         this.tag = tag;
         this.dynamicColumn = dynamicColumn;
         this.fieldDelimiter = fieldDelimiter;
@@ -141,6 +185,8 @@ public class RocketMQDynamicTableSink implements DynamicTableSink, SupportsWriti
                         topic,
                         producerGroup,
                         nameServerAddress,
+                        accessKey,
+                        secretKey,
                         tag,
                         dynamicColumn,
                         fieldDelimiter,
@@ -196,6 +242,8 @@ public class RocketMQDynamicTableSink implements DynamicTableSink, SupportsWriti
         Properties producerProps = new Properties();
         producerProps.setProperty(RocketMQConfig.PRODUCER_GROUP, producerGroup);
         producerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameServerAddress);
+        producerProps.setProperty(RocketMQConfig.ACCESS_KEY, accessKey);
+        producerProps.setProperty(RocketMQConfig.SECRET_KEY, secretKey);
         return producerProps;
     }
 
diff --git a/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSinkFactory.java b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSinkFactory.java
index 72d29d5..2d46443 100644
--- a/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSinkFactory.java
+++ b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSinkFactory.java
@@ -37,8 +37,10 @@ import java.util.stream.Collectors;
 
 import static org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;
 import static org.apache.rocketmq.flink.common.RocketMQOptions.NAME_SERVER_ADDRESS;
+import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_ACCESS_KEY;
 import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_ENCODING;
 import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_FIELD_DELIMITER;
+import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_SECRET_KEY;
 import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_TAG;
 import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN;
 import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN_WRITE_INCLUDED;
@@ -83,6 +85,8 @@ public class RocketMQDynamicTableSinkFactory implements DynamicTableSinkFactory
         optionalOptions.add(OPTIONAL_WRITE_KEY_COLUMNS);
         optionalOptions.add(OPTIONAL_ENCODING);
         optionalOptions.add(OPTIONAL_FIELD_DELIMITER);
+        optionalOptions.add(OPTIONAL_ACCESS_KEY);
+        optionalOptions.add(OPTIONAL_SECRET_KEY);
         return optionalOptions;
     }
 
@@ -97,6 +101,8 @@ public class RocketMQDynamicTableSinkFactory implements DynamicTableSinkFactory
         String producerGroup = properties.getString(PRODUCER_GROUP);
         String nameServerAddress = properties.getString(NAME_SERVER_ADDRESS);
         String tag = properties.getString(OPTIONAL_TAG);
+        String accessKey = properties.getString(OPTIONAL_ACCESS_KEY);
+        String secretKey = properties.getString(OPTIONAL_SECRET_KEY);
         String dynamicColumn = properties.getString(OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN);
         String encoding = properties.getString(OPTIONAL_ENCODING);
         String fieldDelimiter = properties.getString(OPTIONAL_FIELD_DELIMITER);
@@ -115,12 +121,15 @@ public class RocketMQDynamicTableSinkFactory implements DynamicTableSinkFactory
         descriptorProperties.putProperties(rawProperties);
         TableSchema physicalSchema =
                 TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+
         return new RocketMQDynamicTableSink(
                 descriptorProperties,
                 physicalSchema,
                 topicName,
                 producerGroup,
                 nameServerAddress,
+                accessKey,
+                secretKey,
                 tag,
                 dynamicColumn,
                 fieldDelimiter,
diff --git a/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java
index 801814a..74e90e8 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java
@@ -64,6 +64,9 @@ public class RocketMQSource<OUT>
     private final String tag;
     private final String sql;
 
+    private final String accessKey;
+    private final String secretKey;
+
     private final long stopInMs;
     private final long startTime;
     private final long startOffset;
@@ -85,12 +88,44 @@ public class RocketMQSource<OUT>
             long partitionDiscoveryIntervalMs,
             Boundedness boundedness,
             RocketMQDeserializationSchema<OUT> deserializationSchema) {
+        this(
+                topic,
+                consumerGroup,
+                nameServerAddress,
+                null,
+                null,
+                tag,
+                sql,
+                stopInMs,
+                startTime,
+                startOffset,
+                partitionDiscoveryIntervalMs,
+                boundedness,
+                deserializationSchema);
+    }
+
+    public RocketMQSource(
+            String topic,
+            String consumerGroup,
+            String nameServerAddress,
+            String accessKey,
+            String secretKey,
+            String tag,
+            String sql,
+            long stopInMs,
+            long startTime,
+            long startOffset,
+            long partitionDiscoveryIntervalMs,
+            Boundedness boundedness,
+            RocketMQDeserializationSchema<OUT> deserializationSchema) {
         Validate.isTrue(
                 !(StringUtils.isNotEmpty(tag) && StringUtils.isNotEmpty(sql)),
                 "Consumer tag and sql can not set value at the same time");
         this.topic = topic;
         this.consumerGroup = consumerGroup;
         this.nameServerAddress = nameServerAddress;
+        this.accessKey = accessKey;
+        this.secretKey = secretKey;
         this.tag = StringUtils.isEmpty(tag) ? RocketMQConfig.DEFAULT_CONSUMER_TAG : tag;
         this.sql = sql;
         this.stopInMs = stopInMs;
@@ -130,6 +165,8 @@ public class RocketMQSource<OUT>
                                 topic,
                                 consumerGroup,
                                 nameServerAddress,
+                                accessKey,
+                                secretKey,
                                 tag,
                                 sql,
                                 stopInMs,
@@ -149,10 +186,13 @@ public class RocketMQSource<OUT>
     @Override
     public SplitEnumerator<RocketMQPartitionSplit, RocketMQSourceEnumState> createEnumerator(
             SplitEnumeratorContext<RocketMQPartitionSplit> enumContext) {
+
         return new RocketMQSourceEnumerator(
                 topic,
                 consumerGroup,
                 nameServerAddress,
+                accessKey,
+                secretKey,
                 stopInMs,
                 startOffset,
                 partitionDiscoveryIntervalMs,
@@ -164,10 +204,13 @@ public class RocketMQSource<OUT>
     public SplitEnumerator<RocketMQPartitionSplit, RocketMQSourceEnumState> restoreEnumerator(
             SplitEnumeratorContext<RocketMQPartitionSplit> enumContext,
             RocketMQSourceEnumState checkpoint) {
+
         return new RocketMQSourceEnumerator(
                 topic,
                 consumerGroup,
                 nameServerAddress,
+                accessKey,
+                secretKey,
                 stopInMs,
                 startOffset,
                 partitionDiscoveryIntervalMs,
diff --git a/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java b/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java
index 61b563a..db593ec 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java
@@ -18,6 +18,8 @@
 
 package org.apache.rocketmq.flink.source.enumerator;
 
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.message.MessageQueue;
@@ -31,6 +33,7 @@ import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.api.connector.source.SplitsAssignment;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.StringUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,6 +73,11 @@ public class RocketMQSourceEnumerator
     /** The boundedness of this RocketMQSource. */
     private final Boundedness boundedness;
 
+    /** The accessKey used for this RocketMQSource. */
+    private final String accessKey;
+    /** The secretKey used for this RocketMQSource. */
+    private final String secretKey;
+
     private final SplitEnumeratorContext<RocketMQPartitionSplit> context;
 
     // The internal states of the enumerator.
@@ -94,6 +102,8 @@ public class RocketMQSourceEnumerator
             String topic,
             String consumerGroup,
             String nameServerAddress,
+            String accessKey,
+            String secretKey,
             long stopInMs,
             long startOffset,
             long partitionDiscoveryIntervalMs,
@@ -103,6 +113,8 @@ public class RocketMQSourceEnumerator
                 topic,
                 consumerGroup,
                 nameServerAddress,
+                accessKey,
+                secretKey,
                 stopInMs,
                 startOffset,
                 partitionDiscoveryIntervalMs,
@@ -115,6 +127,8 @@ public class RocketMQSourceEnumerator
             String topic,
             String consumerGroup,
             String nameServerAddress,
+            String accessKey,
+            String secretKey,
             long stopInMs,
             long startOffset,
             long partitionDiscoveryIntervalMs,
@@ -124,6 +138,8 @@ public class RocketMQSourceEnumerator
         this.topic = topic;
         this.consumerGroup = consumerGroup;
         this.nameServerAddress = nameServerAddress;
+        this.accessKey = accessKey;
+        this.secretKey = secretKey;
         this.stopInMs = stopInMs;
         this.startOffset = startOffset;
         this.partitionDiscoveryIntervalMs = partitionDiscoveryIntervalMs;
@@ -303,7 +319,15 @@ public class RocketMQSourceEnumerator
 
     private void initialRocketMQConsumer() {
         try {
-            consumer = new DefaultMQPullConsumer(consumerGroup);
+            if (!StringUtils.isNullOrWhitespaceOnly(accessKey)
+                    && !StringUtils.isNullOrWhitespaceOnly(secretKey)) {
+                AclClientRPCHook aclClientRPCHook =
+                        new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
+                consumer = new DefaultMQPullConsumer(consumerGroup, aclClientRPCHook);
+            } else {
+                consumer = new DefaultMQPullConsumer(consumerGroup);
+            }
+
             consumer.setNamesrvAddr(nameServerAddress);
             consumer.setInstanceName(
                     String.join(
diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
index f7af900..ad25f0e 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
@@ -18,6 +18,8 @@
 
 package org.apache.rocketmq.flink.source.reader;
 
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.client.consumer.MessageSelector;
 import org.apache.rocketmq.client.consumer.PullResult;
@@ -73,6 +75,9 @@ public class RocketMQPartitionSplitReader<T>
     private final long startTime;
     private final long startOffset;
 
+    private final String accessKey;
+    private final String secretKey;
+
     private final RocketMQDeserializationSchema<T> deserializationSchema;
     private final Map<Tuple3<String, String, Integer>, Long> startingOffsets;
     private final Map<Tuple3<String, String, Integer>, Long> stoppingTimestamps;
@@ -88,6 +93,8 @@ public class RocketMQPartitionSplitReader<T>
             String topic,
             String consumerGroup,
             String nameServerAddress,
+            String accessKey,
+            String secretKey,
             String tag,
             String sql,
             long stopInMs,
@@ -97,6 +104,8 @@ public class RocketMQPartitionSplitReader<T>
         this.topic = topic;
         this.tag = tag;
         this.sql = sql;
+        this.accessKey = accessKey;
+        this.secretKey = secretKey;
         this.stopInMs = stopInMs;
         this.startTime = startTime;
         this.startOffset = startOffset;
@@ -104,7 +113,7 @@ public class RocketMQPartitionSplitReader<T>
         this.startingOffsets = new HashMap<>();
         this.stoppingTimestamps = new HashMap<>();
         this.collector = new SimpleCollector<>();
-        initialRocketMQConsumer(consumerGroup, nameServerAddress);
+        initialRocketMQConsumer(consumerGroup, nameServerAddress, accessKey, secretKey);
     }
 
     @Override
@@ -304,9 +313,17 @@ public class RocketMQPartitionSplitReader<T>
 
     // --------------- private helper method ----------------------
 
-    private void initialRocketMQConsumer(String consumerGroup, String nameServerAddress) {
+    private void initialRocketMQConsumer(
+            String consumerGroup, String nameServerAddress, String accessKey, String secretKey) {
+
         try {
-            consumer = new DefaultMQPullConsumer(consumerGroup);
+            if (StringUtils.isNotBlank(accessKey) && StringUtils.isNotBlank(secretKey)) {
+                AclClientRPCHook aclClientRPCHook =
+                        new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
+                consumer = new DefaultMQPullConsumer(consumerGroup, aclClientRPCHook);
+            } else {
+                consumer = new DefaultMQPullConsumer(consumerGroup);
+            }
             consumer.setNamesrvAddr(nameServerAddress);
             consumer.setInstanceName(
                     String.join(
diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQDeserializationSchema.java b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQDeserializationSchema.java
index 26612ea..e50b702 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQDeserializationSchema.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQDeserializationSchema.java
@@ -40,6 +40,7 @@ public interface RocketMQDeserializationSchema<T>
      *
      * @param context Contextual information that can be used during initialization.
      */
+    @Override
     @PublicEvolving
     default void open(InitializationContext context) {}
 
@@ -54,6 +55,7 @@ public interface RocketMQDeserializationSchema<T>
      * @param record The MessageExts to deserialize.
      * @param out The collector to put the resulting messages.
      */
+    @Override
     @PublicEvolving
     void deserialize(List<MessageExt> record, Collector<T> out) throws IOException;
 }
diff --git a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
index a1bbdba..4117cc1 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
@@ -44,6 +44,7 @@ import java.util.stream.Collectors;
 import static org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;
 import static org.apache.rocketmq.flink.common.RocketMQOptions.CONSUMER_GROUP;
 import static org.apache.rocketmq.flink.common.RocketMQOptions.NAME_SERVER_ADDRESS;
+import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_ACCESS_KEY;
 import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_COLUMN_ERROR_DEBUG;
 import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_ENCODING;
 import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_END_TIME;
@@ -51,6 +52,7 @@ import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_FIELD_DE
 import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_LENGTH_CHECK;
 import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_LINE_DELIMITER;
 import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_PARTITION_DISCOVERY_INTERVAL_MS;
+import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_SECRET_KEY;
 import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_SQL;
 import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_START_MESSAGE_OFFSET;
 import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_START_TIME;
@@ -99,6 +101,8 @@ public class RocketMQDynamicTableSourceFactory implements DynamicTableSourceFact
         optionalOptions.add(OPTIONAL_LINE_DELIMITER);
         optionalOptions.add(OPTIONAL_COLUMN_ERROR_DEBUG);
         optionalOptions.add(OPTIONAL_LENGTH_CHECK);
+        optionalOptions.add(OPTIONAL_ACCESS_KEY);
+        optionalOptions.add(OPTIONAL_SECRET_KEY);
         return optionalOptions;
     }
 
diff --git a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java
index 1c6c0a9..e75bfaa 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java
@@ -62,6 +62,9 @@ public class RocketMQScanTableSource implements ScanTableSource, SupportsReading
     private final String tag;
     private final String sql;
 
+    private final String accessKey;
+    private final String secretKey;
+
     private final long stopInMs;
     private final long partitionDiscoveryIntervalMs;
     private final long startMessageOffset;
@@ -83,11 +86,46 @@ public class RocketMQScanTableSource implements ScanTableSource, SupportsReading
             long startTime,
             long partitionDiscoveryIntervalMs,
             boolean useNewApi) {
+
+        this(
+                properties,
+                schema,
+                topic,
+                consumerGroup,
+                nameServerAddress,
+                null,
+                null,
+                tag,
+                sql,
+                stopInMs,
+                startMessageOffset,
+                startTime,
+                partitionDiscoveryIntervalMs,
+                useNewApi);
+    }
+
+    public RocketMQScanTableSource(
+            DescriptorProperties properties,
+            TableSchema schema,
+            String topic,
+            String consumerGroup,
+            String nameServerAddress,
+            String accessKey,
+            String secretKey,
+            String tag,
+            String sql,
+            long stopInMs,
+            long startMessageOffset,
+            long startTime,
+            long partitionDiscoveryIntervalMs,
+            boolean useNewApi) {
         this.properties = properties;
         this.schema = schema;
         this.topic = topic;
         this.consumerGroup = consumerGroup;
         this.nameServerAddress = nameServerAddress;
+        this.accessKey = accessKey;
+        this.secretKey = secretKey;
         this.tag = tag;
         this.sql = sql;
         this.stopInMs = stopInMs;
@@ -149,6 +187,8 @@ public class RocketMQScanTableSource implements ScanTableSource, SupportsReading
                         topic,
                         consumerGroup,
                         nameServerAddress,
+                        accessKey,
+                        secretKey,
                         tag,
                         sql,
                         stopInMs,