You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ni...@apache.org on 2022/07/26 09:54:43 UTC
[rocketmq-flink] branch main updated: [issues-44] support access control
This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
The following commit(s) were added to refs/heads/main by this push:
new 034c4f2 [issues-44] support access control
new 87fb9c2 Merge pull request #45 from lizhimins/support-acl
034c4f2 is described below
commit 034c4f2b7108ab9f76ebcd127078bdbb8cc26aa4
Author: 斜阳 <te...@alibaba-inc.com>
AuthorDate: Sun Jul 24 16:28:39 2022 +0800
[issues-44] support access control
---
.../rocketmq/flink/common/RocketMQOptions.java | 7 ++++
.../flink/sink/table/RocketMQDynamicTableSink.java | 48 ++++++++++++++++++++++
.../table/RocketMQDynamicTableSinkFactory.java | 9 ++++
.../rocketmq/flink/source/RocketMQSource.java | 43 +++++++++++++++++++
.../enumerator/RocketMQSourceEnumerator.java | 26 +++++++++++-
.../reader/RocketMQPartitionSplitReader.java | 23 +++++++++--
.../RocketMQDeserializationSchema.java | 2 +
.../table/RocketMQDynamicTableSourceFactory.java | 4 ++
.../source/table/RocketMQScanTableSource.java | 40 ++++++++++++++++++
9 files changed, 198 insertions(+), 4 deletions(-)
diff --git a/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java b/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java
index d14b063..dee3b96 100644
--- a/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java
+++ b/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java
@@ -83,6 +83,7 @@ public class RocketMQOptions {
public static final ConfigOption<Long> OPTIONAL_WRITE_SLEEP_TIME_MS =
ConfigOptions.key("sleepTimeMs").longType().defaultValue(5000L);
+
public static final ConfigOption<Boolean> OPTIONAL_WRITE_IS_DYNAMIC_TAG =
ConfigOptions.key("isDynamicTag").booleanType().defaultValue(false);
@@ -97,4 +98,10 @@ public class RocketMQOptions {
public static final ConfigOption<Boolean> OPTIONAL_WRITE_KEYS_TO_BODY =
ConfigOptions.key("writeKeysToBody").booleanType().defaultValue(false);
+
+ public static final ConfigOption<String> OPTIONAL_ACCESS_KEY =
+ ConfigOptions.key("accessKey").stringType().noDefaultValue();
+
+ public static final ConfigOption<String> OPTIONAL_SECRET_KEY =
+ ConfigOptions.key("secretKey").stringType().noDefaultValue();
}
diff --git a/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java
index cd6261c..816449f 100644
--- a/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java
+++ b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java
@@ -57,6 +57,9 @@ public class RocketMQDynamicTableSink implements DynamicTableSink, SupportsWriti
private final String fieldDelimiter;
private final String encoding;
+ private final String accessKey;
+ private final String secretKey;
+
private final long retryTimes;
private final long sleepTime;
@@ -84,11 +87,52 @@ public class RocketMQDynamicTableSink implements DynamicTableSink, SupportsWriti
boolean isDynamicTagIncluded,
boolean writeKeysToBody,
String[] keyColumns) {
+
+ this(
+ properties,
+ schema,
+ topic,
+ null,
+ null,
+ producerGroup,
+ nameServerAddress,
+ tag,
+ dynamicColumn,
+ fieldDelimiter,
+ encoding,
+ retryTimes,
+ sleepTime,
+ isDynamicTag,
+ isDynamicTagIncluded,
+ writeKeysToBody,
+ keyColumns);
+ }
+
+ public RocketMQDynamicTableSink(
+ DescriptorProperties properties,
+ TableSchema schema,
+ String topic,
+ String producerGroup,
+ String nameServerAddress,
+ String accessKey,
+ String secretKey,
+ String tag,
+ String dynamicColumn,
+ String fieldDelimiter,
+ String encoding,
+ long retryTimes,
+ long sleepTime,
+ boolean isDynamicTag,
+ boolean isDynamicTagIncluded,
+ boolean writeKeysToBody,
+ String[] keyColumns) {
this.properties = properties;
this.schema = schema;
this.topic = topic;
this.producerGroup = producerGroup;
this.nameServerAddress = nameServerAddress;
+ this.accessKey = accessKey;
+ this.secretKey = secretKey;
this.tag = tag;
this.dynamicColumn = dynamicColumn;
this.fieldDelimiter = fieldDelimiter;
@@ -141,6 +185,8 @@ public class RocketMQDynamicTableSink implements DynamicTableSink, SupportsWriti
topic,
producerGroup,
nameServerAddress,
+ accessKey,
+ secretKey,
tag,
dynamicColumn,
fieldDelimiter,
@@ -196,6 +242,8 @@ public class RocketMQDynamicTableSink implements DynamicTableSink, SupportsWriti
Properties producerProps = new Properties();
producerProps.setProperty(RocketMQConfig.PRODUCER_GROUP, producerGroup);
producerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameServerAddress);
+ producerProps.setProperty(RocketMQConfig.ACCESS_KEY, accessKey);
+ producerProps.setProperty(RocketMQConfig.SECRET_KEY, secretKey);
return producerProps;
}
diff --git a/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSinkFactory.java b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSinkFactory.java
index 72d29d5..2d46443 100644
--- a/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSinkFactory.java
+++ b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSinkFactory.java
@@ -37,8 +37,10 @@ import java.util.stream.Collectors;
import static org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;
import static org.apache.rocketmq.flink.common.RocketMQOptions.NAME_SERVER_ADDRESS;
+import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_ACCESS_KEY;
import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_ENCODING;
import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_FIELD_DELIMITER;
+import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_SECRET_KEY;
import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_TAG;
import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN;
import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN_WRITE_INCLUDED;
@@ -83,6 +85,8 @@ public class RocketMQDynamicTableSinkFactory implements DynamicTableSinkFactory
optionalOptions.add(OPTIONAL_WRITE_KEY_COLUMNS);
optionalOptions.add(OPTIONAL_ENCODING);
optionalOptions.add(OPTIONAL_FIELD_DELIMITER);
+ optionalOptions.add(OPTIONAL_ACCESS_KEY);
+ optionalOptions.add(OPTIONAL_SECRET_KEY);
return optionalOptions;
}
@@ -97,6 +101,8 @@ public class RocketMQDynamicTableSinkFactory implements DynamicTableSinkFactory
String producerGroup = properties.getString(PRODUCER_GROUP);
String nameServerAddress = properties.getString(NAME_SERVER_ADDRESS);
String tag = properties.getString(OPTIONAL_TAG);
+ String accessKey = properties.getString(OPTIONAL_ACCESS_KEY);
+ String secretKey = properties.getString(OPTIONAL_SECRET_KEY);
String dynamicColumn = properties.getString(OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN);
String encoding = properties.getString(OPTIONAL_ENCODING);
String fieldDelimiter = properties.getString(OPTIONAL_FIELD_DELIMITER);
@@ -115,12 +121,15 @@ public class RocketMQDynamicTableSinkFactory implements DynamicTableSinkFactory
descriptorProperties.putProperties(rawProperties);
TableSchema physicalSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+
return new RocketMQDynamicTableSink(
descriptorProperties,
physicalSchema,
topicName,
producerGroup,
nameServerAddress,
+ accessKey,
+ secretKey,
tag,
dynamicColumn,
fieldDelimiter,
diff --git a/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java
index 801814a..74e90e8 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java
@@ -64,6 +64,9 @@ public class RocketMQSource<OUT>
private final String tag;
private final String sql;
+ private final String accessKey;
+ private final String secretKey;
+
private final long stopInMs;
private final long startTime;
private final long startOffset;
@@ -85,12 +88,44 @@ public class RocketMQSource<OUT>
long partitionDiscoveryIntervalMs,
Boundedness boundedness,
RocketMQDeserializationSchema<OUT> deserializationSchema) {
+ this(
+ topic,
+ consumerGroup,
+ nameServerAddress,
+ null,
+ null,
+ tag,
+ sql,
+ stopInMs,
+ startTime,
+ startOffset,
+ partitionDiscoveryIntervalMs,
+ boundedness,
+ deserializationSchema);
+ }
+
+ public RocketMQSource(
+ String topic,
+ String consumerGroup,
+ String nameServerAddress,
+ String accessKey,
+ String secretKey,
+ String tag,
+ String sql,
+ long stopInMs,
+ long startTime,
+ long startOffset,
+ long partitionDiscoveryIntervalMs,
+ Boundedness boundedness,
+ RocketMQDeserializationSchema<OUT> deserializationSchema) {
Validate.isTrue(
!(StringUtils.isNotEmpty(tag) && StringUtils.isNotEmpty(sql)),
"Consumer tag and sql can not set value at the same time");
this.topic = topic;
this.consumerGroup = consumerGroup;
this.nameServerAddress = nameServerAddress;
+ this.accessKey = accessKey;
+ this.secretKey = secretKey;
this.tag = StringUtils.isEmpty(tag) ? RocketMQConfig.DEFAULT_CONSUMER_TAG : tag;
this.sql = sql;
this.stopInMs = stopInMs;
@@ -130,6 +165,8 @@ public class RocketMQSource<OUT>
topic,
consumerGroup,
nameServerAddress,
+ accessKey,
+ secretKey,
tag,
sql,
stopInMs,
@@ -149,10 +186,13 @@ public class RocketMQSource<OUT>
@Override
public SplitEnumerator<RocketMQPartitionSplit, RocketMQSourceEnumState> createEnumerator(
SplitEnumeratorContext<RocketMQPartitionSplit> enumContext) {
+
return new RocketMQSourceEnumerator(
topic,
consumerGroup,
nameServerAddress,
+ accessKey,
+ secretKey,
stopInMs,
startOffset,
partitionDiscoveryIntervalMs,
@@ -164,10 +204,13 @@ public class RocketMQSource<OUT>
public SplitEnumerator<RocketMQPartitionSplit, RocketMQSourceEnumState> restoreEnumerator(
SplitEnumeratorContext<RocketMQPartitionSplit> enumContext,
RocketMQSourceEnumState checkpoint) {
+
return new RocketMQSourceEnumerator(
topic,
consumerGroup,
nameServerAddress,
+ accessKey,
+ secretKey,
stopInMs,
startOffset,
partitionDiscoveryIntervalMs,
diff --git a/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java b/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java
index 61b563a..db593ec 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java
@@ -18,6 +18,8 @@
package org.apache.rocketmq.flink.source.enumerator;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;
@@ -31,6 +33,7 @@ import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,6 +73,11 @@ public class RocketMQSourceEnumerator
/** The boundedness of this RocketMQSource. */
private final Boundedness boundedness;
+ /** The accessKey used for this RocketMQSource. */
+ private final String accessKey;
+ /** The secretKey used for this RocketMQSource. */
+ private final String secretKey;
+
private final SplitEnumeratorContext<RocketMQPartitionSplit> context;
// The internal states of the enumerator.
@@ -94,6 +102,8 @@ public class RocketMQSourceEnumerator
String topic,
String consumerGroup,
String nameServerAddress,
+ String accessKey,
+ String secretKey,
long stopInMs,
long startOffset,
long partitionDiscoveryIntervalMs,
@@ -103,6 +113,8 @@ public class RocketMQSourceEnumerator
topic,
consumerGroup,
nameServerAddress,
+ accessKey,
+ secretKey,
stopInMs,
startOffset,
partitionDiscoveryIntervalMs,
@@ -115,6 +127,8 @@ public class RocketMQSourceEnumerator
String topic,
String consumerGroup,
String nameServerAddress,
+ String accessKey,
+ String secretKey,
long stopInMs,
long startOffset,
long partitionDiscoveryIntervalMs,
@@ -124,6 +138,8 @@ public class RocketMQSourceEnumerator
this.topic = topic;
this.consumerGroup = consumerGroup;
this.nameServerAddress = nameServerAddress;
+ this.accessKey = accessKey;
+ this.secretKey = secretKey;
this.stopInMs = stopInMs;
this.startOffset = startOffset;
this.partitionDiscoveryIntervalMs = partitionDiscoveryIntervalMs;
@@ -303,7 +319,15 @@ public class RocketMQSourceEnumerator
private void initialRocketMQConsumer() {
try {
- consumer = new DefaultMQPullConsumer(consumerGroup);
+ if (!StringUtils.isNullOrWhitespaceOnly(accessKey)
+ && !StringUtils.isNullOrWhitespaceOnly(secretKey)) {
+ AclClientRPCHook aclClientRPCHook =
+ new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
+ consumer = new DefaultMQPullConsumer(consumerGroup, aclClientRPCHook);
+ } else {
+ consumer = new DefaultMQPullConsumer(consumerGroup);
+ }
+
consumer.setNamesrvAddr(nameServerAddress);
consumer.setInstanceName(
String.join(
diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
index f7af900..ad25f0e 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
@@ -18,6 +18,8 @@
package org.apache.rocketmq.flink.source.reader;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.PullResult;
@@ -73,6 +75,9 @@ public class RocketMQPartitionSplitReader<T>
private final long startTime;
private final long startOffset;
+ private final String accessKey;
+ private final String secretKey;
+
private final RocketMQDeserializationSchema<T> deserializationSchema;
private final Map<Tuple3<String, String, Integer>, Long> startingOffsets;
private final Map<Tuple3<String, String, Integer>, Long> stoppingTimestamps;
@@ -88,6 +93,8 @@ public class RocketMQPartitionSplitReader<T>
String topic,
String consumerGroup,
String nameServerAddress,
+ String accessKey,
+ String secretKey,
String tag,
String sql,
long stopInMs,
@@ -97,6 +104,8 @@ public class RocketMQPartitionSplitReader<T>
this.topic = topic;
this.tag = tag;
this.sql = sql;
+ this.accessKey = accessKey;
+ this.secretKey = secretKey;
this.stopInMs = stopInMs;
this.startTime = startTime;
this.startOffset = startOffset;
@@ -104,7 +113,7 @@ public class RocketMQPartitionSplitReader<T>
this.startingOffsets = new HashMap<>();
this.stoppingTimestamps = new HashMap<>();
this.collector = new SimpleCollector<>();
- initialRocketMQConsumer(consumerGroup, nameServerAddress);
+ initialRocketMQConsumer(consumerGroup, nameServerAddress, accessKey, secretKey);
}
@Override
@@ -304,9 +313,17 @@ public class RocketMQPartitionSplitReader<T>
// --------------- private helper method ----------------------
- private void initialRocketMQConsumer(String consumerGroup, String nameServerAddress) {
+ private void initialRocketMQConsumer(
+ String consumerGroup, String nameServerAddress, String accessKey, String secretKey) {
+
try {
- consumer = new DefaultMQPullConsumer(consumerGroup);
+ if (StringUtils.isNotBlank(accessKey) && StringUtils.isNotBlank(secretKey)) {
+ AclClientRPCHook aclClientRPCHook =
+ new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
+ consumer = new DefaultMQPullConsumer(consumerGroup, aclClientRPCHook);
+ } else {
+ consumer = new DefaultMQPullConsumer(consumerGroup);
+ }
consumer.setNamesrvAddr(nameServerAddress);
consumer.setInstanceName(
String.join(
diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQDeserializationSchema.java b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQDeserializationSchema.java
index 26612ea..e50b702 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQDeserializationSchema.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQDeserializationSchema.java
@@ -40,6 +40,7 @@ public interface RocketMQDeserializationSchema<T>
*
* @param context Contextual information that can be used during initialization.
*/
+ @Override
@PublicEvolving
default void open(InitializationContext context) {}
@@ -54,6 +55,7 @@ public interface RocketMQDeserializationSchema<T>
* @param record The MessageExts to deserialize.
* @param out The collector to put the resulting messages.
*/
+ @Override
@PublicEvolving
void deserialize(List<MessageExt> record, Collector<T> out) throws IOException;
}
diff --git a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
index a1bbdba..4117cc1 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
@@ -44,6 +44,7 @@ import java.util.stream.Collectors;
import static org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;
import static org.apache.rocketmq.flink.common.RocketMQOptions.CONSUMER_GROUP;
import static org.apache.rocketmq.flink.common.RocketMQOptions.NAME_SERVER_ADDRESS;
+import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_ACCESS_KEY;
import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_COLUMN_ERROR_DEBUG;
import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_ENCODING;
import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_END_TIME;
@@ -51,6 +52,7 @@ import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_FIELD_DE
import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_LENGTH_CHECK;
import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_LINE_DELIMITER;
import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_PARTITION_DISCOVERY_INTERVAL_MS;
+import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_SECRET_KEY;
import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_SQL;
import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_START_MESSAGE_OFFSET;
import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_START_TIME;
@@ -99,6 +101,8 @@ public class RocketMQDynamicTableSourceFactory implements DynamicTableSourceFact
optionalOptions.add(OPTIONAL_LINE_DELIMITER);
optionalOptions.add(OPTIONAL_COLUMN_ERROR_DEBUG);
optionalOptions.add(OPTIONAL_LENGTH_CHECK);
+ optionalOptions.add(OPTIONAL_ACCESS_KEY);
+ optionalOptions.add(OPTIONAL_SECRET_KEY);
return optionalOptions;
}
diff --git a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java
index 1c6c0a9..e75bfaa 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java
@@ -62,6 +62,9 @@ public class RocketMQScanTableSource implements ScanTableSource, SupportsReading
private final String tag;
private final String sql;
+ private final String accessKey;
+ private final String secretKey;
+
private final long stopInMs;
private final long partitionDiscoveryIntervalMs;
private final long startMessageOffset;
@@ -83,11 +86,46 @@ public class RocketMQScanTableSource implements ScanTableSource, SupportsReading
long startTime,
long partitionDiscoveryIntervalMs,
boolean useNewApi) {
+
+ this(
+ properties,
+ schema,
+ topic,
+ consumerGroup,
+ nameServerAddress,
+ null,
+ null,
+ tag,
+ sql,
+ stopInMs,
+ startMessageOffset,
+ startTime,
+ partitionDiscoveryIntervalMs,
+ useNewApi);
+ }
+
+ public RocketMQScanTableSource(
+ DescriptorProperties properties,
+ TableSchema schema,
+ String topic,
+ String consumerGroup,
+ String nameServerAddress,
+ String accessKey,
+ String secretKey,
+ String tag,
+ String sql,
+ long stopInMs,
+ long startMessageOffset,
+ long startTime,
+ long partitionDiscoveryIntervalMs,
+ boolean useNewApi) {
this.properties = properties;
this.schema = schema;
this.topic = topic;
this.consumerGroup = consumerGroup;
this.nameServerAddress = nameServerAddress;
+ this.accessKey = accessKey;
+ this.secretKey = secretKey;
this.tag = tag;
this.sql = sql;
this.stopInMs = stopInMs;
@@ -149,6 +187,8 @@ public class RocketMQScanTableSource implements ScanTableSource, SupportsReading
topic,
consumerGroup,
nameServerAddress,
+ accessKey,
+ secretKey,
tag,
sql,
stopInMs,