You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/08/24 11:18:39 UTC
[rocketmq-flink] 26/33: [#705] Support the implementation of new
Source interface (#706)
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
commit f882acfc1b6336ed24564bd992e15b87e01e66fd
Author: SteNicholas <pr...@163.com>
AuthorDate: Mon Apr 19 14:01:18 2021 +0800
[#705] Support the implementation of new Source interface (#706)
---
README.md | 8 +-
pom.xml | 57 ++-
.../org/apache/rocketmq/flink/RocketMQSource.java | 433 -----------------
.../org/apache/rocketmq/flink/RunningChecker.java | 33 --
.../flink/common/selector/TopicSelector.java | 28 --
.../KeyValueDeserializationSchema.java | 27 --
.../serialization/KeyValueSerializationSchema.java | 28 --
.../rocketmq/flink/common/util/MetricUtils.java | 80 ----
.../rocketmq/flink/common/util/RocketMQUtils.java | 73 ---
.../rocketmq/flink/common/util/TestUtils.java | 33 --
.../flink/{ => legacy}/RocketMQConfig.java | 70 +--
.../rocketmq/flink/{ => legacy}/RocketMQSink.java | 98 ++--
.../rocketmq/flink/legacy/RocketMQSource.java | 528 +++++++++++++++++++++
.../rocketmq/flink/legacy/RunningChecker.java | 29 ++
.../common/selector/DefaultTopicSelector.java | 39 ++
.../common/selector/SimpleTopicSelector.java | 46 +-
.../legacy/common/selector/TopicSelector.java | 24 +
.../ForwardMessageExtDeserialization.java | 14 +-
.../KeyValueDeserializationSchema.java | 23 +
.../serialization/KeyValueSerializationSchema.java | 24 +
.../MessageExtDeserializationScheme.java | 8 +-
.../SimpleKeyValueDeserializationSchema.java | 33 +-
.../SimpleKeyValueSerializationSchema.java | 28 +-
.../SimpleTupleDeserializationSchema.java | 7 +-
.../flink/legacy/common/util/MetricUtils.java | 85 ++++
.../flink/{ => legacy}/common/util/RetryUtil.java | 27 +-
.../flink/legacy/common/util/RocketMQUtils.java | 79 +++
.../flink/legacy/common/util/TestUtils.java | 29 ++
.../watermark/BoundedOutOfOrdernessGenerator.java | 18 +-
.../BoundedOutOfOrdernessGeneratorPerQueue.java | 27 +-
.../common/watermark/PunctuatedAssigner.java | 23 +-
.../watermark/TimeLagWatermarkGenerator.java | 19 +-
.../common/watermark/WaterMarkForAll.java | 10 +-
.../common/watermark/WaterMarkPerQueue.java | 18 +-
.../{ => legacy}/example/RocketMQFlinkExample.java | 46 +-
.../flink/{ => legacy}/example/SimpleConsumer.java | 31 +-
.../flink/{ => legacy}/example/SimpleProducer.java | 14 +-
.../{ => legacy}/function/SinkMapFunction.java | 14 +-
.../{ => legacy}/function/SourceMapFunction.java | 9 +-
.../rocketmq/flink/source/RocketMQSource.java | 175 +++++++
.../enumerator/RocketMQSourceEnumState.java} | 33 +-
.../RocketMQSourceEnumStateSerializer.java | 64 +++
.../enumerator/RocketMQSourceEnumerator.java | 337 +++++++++++++
.../reader/RocketMQPartitionSplitReader.java | 373 +++++++++++++++
.../flink/source/reader/RocketMQRecordEmitter.java | 39 ++
.../flink/source/reader/RocketMQSourceReader.java | 64 +++
.../RocketMQRecordDeserializationSchema.java | 43 ++
.../flink/source/split/RocketMQPartitionSplit.java | 100 ++++
.../split/RocketMQPartitionSplitSerializer.java | 66 +++
.../source/split/RocketMQPartitionSplitState.java | 57 +++
.../apache/rocketmq/flink/RocketMQSinkTest.java | 70 ---
.../common/selector/DefaultTopicSelectorTest.java | 37 --
.../common/selector/SimpleTopicSelectorTest.java | 49 --
.../SimpleKeyValueSerializationSchemaTest.java | 42 --
.../rocketmq/flink/legacy/RocketMQSinkTest.java | 70 +++
.../flink/{ => legacy}/RocketMQSourceTest.java | 52 +-
.../common/selector/DefaultTopicSelectorTest.java | 32 ++
.../common/selector/SimpleTopicSelectorTest.java | 44 ++
.../SimpleKeyValueSerializationSchemaTest.java | 42 ++
.../RocketMQSourceEnumStateSerializerTest.java | 84 ++++
.../source/reader/RocketMQRecordEmitterTest.java | 97 ++++
.../RocketMQPartitionSplitSerializerTest.java | 44 ++
62 files changed, 2979 insertions(+), 1255 deletions(-)
diff --git a/README.md b/README.md
index 600c57a..97cb1f9 100644
--- a/README.md
+++ b/README.md
@@ -9,7 +9,7 @@ The RocketMQSource is based on RocketMQ pull consumer mode, and provides exactly
Otherwise, the source doesn't provide any reliability guarantees.
### KeyValueDeserializationSchema
-The main API for deserializing topic and tags is the `org.apache.rocketmq.flink.common.serialization.KeyValueDeserializationSchema` interface.
+The main API for deserializing topic and tags is the `org.apache.rocketmq.flink.legacy.common.serialization.KeyValueDeserializationSchema` interface.
`rocketmq-flink` includes general purpose `KeyValueDeserializationSchema` implementations called `SimpleKeyValueDeserializationSchema`.
```java
@@ -26,7 +26,7 @@ Otherwise, the sink reliability guarantees depends on rocketmq producer's retry
but you can change it by invoking `withAsync(true)`.
### KeyValueSerializationSchema
-The main API for serializing topic and tags is the `org.apache.rocketmq.flink.common.serialization.KeyValueSerializationSchema` interface.
+The main API for serializing topic and tags is the `org.apache.rocketmq.flink.legacy.common.serialization.KeyValueSerializationSchema` interface.
`rocketmq-flink` includes general purpose `KeyValueSerializationSchema` implementations called `SimpleKeyValueSerializationSchema`.
```java
@@ -39,7 +39,7 @@ public interface KeyValueSerializationSchema<T> extends Serializable {
```
### TopicSelector
-The main API for selecting topic and tags is the `org.apache.rocketmq.flink.common.selector.TopicSelector` interface.
+The main API for selecting topic and tags is the `org.apache.rocketmq.flink.legacy.common.selector.TopicSelector` interface.
`rocketmq-flink` includes general purpose `TopicSelector` implementations called `DefaultTopicSelector` and `SimpleTopicSelector`.
```java
@@ -96,7 +96,7 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm
```
## Configurations
-The following configurations are all from the class `org.apache.rocketmq.flink.RocketMQConfig`.
+The following configurations are all from the class `org.apache.rocketmq.flink.legacy.RocketMQConfig`.
### Producer Configurations
| NAME | DESCRIPTION | DEFAULT |
diff --git a/pom.xml b/pom.xml
index 2e19ce5..d5fc49f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -34,9 +34,10 @@
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<rocketmq.version>4.7.1</rocketmq.version>
- <flink.version>1.10.1</flink.version>
+ <flink.version>1.12.2</flink.version>
<commons-lang.version>2.5</commons-lang.version>
<scala.binary.version>2.11</scala.binary.version>
+ <spotless.version>2.4.2</spotless.version>
</properties>
<dependencies>
@@ -57,6 +58,26 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-base</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-runtime-blink_2.11</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-queryable-state-runtime_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
@@ -149,7 +170,7 @@
<resource>reference.conf</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
- <mainClass>org.apache.rocketmq.flink.example.RocketMQFlinkExample</mainClass>
+ <mainClass>org.apache.rocketmq.flink.legacy.example.RocketMQFlinkExample</mainClass>
</transformer>
</transformers>
</configuration>
@@ -218,6 +239,38 @@
<locale>en</locale>
</configuration>
</plugin>
+ <!-- Due to the Flink build setup, "mvn spotless:apply" and "mvn spotless:check"
+ don't work. You have to use the fully qualified name, i.e.
+ "mvn com.diffplug.spotless:spotless-maven-plugin:apply" -->
+ <plugin>
+ <groupId>com.diffplug.spotless</groupId>
+ <artifactId>spotless-maven-plugin</artifactId>
+ <version>${spotless.version}</version>
+ <configuration>
+ <java>
+ <googleJavaFormat>
+ <version>1.7</version>
+ <style>AOSP</style>
+ </googleJavaFormat>
+
+ <!-- \# refers to the static imports -->
+ <importOrder>
+ <order>org.apache.rocketmq,org.apache.flink,org.apache.flink.shaded,,javax,java,scala,\#</order>
+ </importOrder>
+
+ <removeUnusedImports />
+ </java>
+ </configuration>
+ <executions>
+ <execution>
+ <id>spotless-check</id>
+ <phase>validate</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
deleted file mode 100644
index 72783a8..0000000
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
+++ /dev/null
@@ -1,433 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.rocketmq.flink;
-
-import org.apache.commons.collections.map.LinkedMap;
-import org.apache.commons.lang.Validate;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.metrics.Meter;
-import org.apache.flink.metrics.MeterView;
-import org.apache.flink.metrics.SimpleCounter;
-import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
-import org.apache.rocketmq.client.consumer.PullResult;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.flink.common.serialization.KeyValueDeserializationSchema;
-import org.apache.rocketmq.flink.common.util.MetricUtils;
-import org.apache.rocketmq.flink.common.util.RetryUtil;
-import org.apache.rocketmq.flink.common.util.RocketMQUtils;
-import org.apache.rocketmq.flink.common.watermark.WaterMarkForAll;
-import org.apache.rocketmq.flink.common.watermark.WaterMarkPerQueue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.management.ManagementFactory;
-import java.nio.charset.StandardCharsets;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.locks.ReentrantLock;
-
-import static org.apache.rocketmq.flink.RocketMQConfig.*;
-import static org.apache.rocketmq.flink.common.util.RocketMQUtils.getInteger;
-import static org.apache.rocketmq.flink.common.util.RocketMQUtils.getLong;
-
-/**
- * The RocketMQSource is based on RocketMQ pull consumer mode, and provides exactly once reliability guarantees when
- * checkpoints are enabled. Otherwise, the source doesn't provide any reliability guarantees.
- */
-public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
- implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<OUT> {
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger log = LoggerFactory.getLogger(RocketMQSource.class);
- private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";
- private RunningChecker runningChecker;
- private transient DefaultMQPullConsumer consumer;
- private KeyValueDeserializationSchema<OUT> schema;
- private transient ListState<Tuple2<MessageQueue, Long>> unionOffsetStates;
- private Map<MessageQueue, Long> offsetTable;
- private Map<MessageQueue, Long> restoredOffsets;
- private List<MessageQueue> messageQueues;
- private ExecutorService executor;
-
- // watermark in source
- private WaterMarkPerQueue waterMarkPerQueue;
- private WaterMarkForAll waterMarkForAll;
-
- private ScheduledExecutorService timer;
- /**
- * Data for pending but uncommitted offsets.
- */
- private LinkedMap pendingOffsetsToCommit;
- private Properties props;
- private String topic;
- private String group;
- private transient volatile boolean restored;
- private transient boolean enableCheckpoint;
- private volatile Object checkPointLock;
-
- private Meter tpsMetric;
-
- public RocketMQSource(KeyValueDeserializationSchema<OUT> schema, Properties props) {
- this.schema = schema;
- this.props = props;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- log.debug("source open....");
- Validate.notEmpty(props, "Consumer properties can not be empty");
-
- this.topic = props.getProperty(RocketMQConfig.CONSUMER_TOPIC);
- this.group = props.getProperty(RocketMQConfig.CONSUMER_GROUP);
-
- Validate.notEmpty(topic, "Consumer topic can not be empty");
- Validate.notEmpty(group, "Consumer group can not be empty");
-
- this.enableCheckpoint = ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled();
-
- if (offsetTable == null) {
- offsetTable = new ConcurrentHashMap<>();
- }
- if (restoredOffsets == null) {
- restoredOffsets = new ConcurrentHashMap<>();
- }
-
- //use restoredOffsets to init offset table.
- initOffsetTableFromRestoredOffsets();
-
- if (pendingOffsetsToCommit == null) {
- pendingOffsetsToCommit = new LinkedMap();
- }
- if (checkPointLock == null) {
- checkPointLock = new ReentrantLock();
- }
- if (waterMarkPerQueue == null) {
- waterMarkPerQueue = new WaterMarkPerQueue(5000);
- }
- if (waterMarkForAll == null) {
- waterMarkForAll = new WaterMarkForAll(5000);
- }
- if (timer == null) {
- timer = Executors.newSingleThreadScheduledExecutor();
- }
-
- runningChecker = new RunningChecker();
- runningChecker.setRunning(true);
-
- final ThreadFactory threadFactory = new ThreadFactoryBuilder()
- .setDaemon(true).setNameFormat("rmq-pull-thread-%d").build();
- executor = Executors.newCachedThreadPool(threadFactory);
-
- int indexOfThisSubTask = getRuntimeContext().getIndexOfThisSubtask();
- consumer = new DefaultMQPullConsumer(group, RocketMQConfig.buildAclRPCHook(props));
- RocketMQConfig.buildConsumerConfigs(props, consumer);
-
- // set unique instance name, avoid exception: https://help.aliyun.com/document_detail/29646.html
- String runtimeName = ManagementFactory.getRuntimeMXBean().getName();
- String instanceName = RocketMQUtils.getInstanceName(runtimeName, topic, group,
- String.valueOf(indexOfThisSubTask), String.valueOf(System.nanoTime()));
- consumer.setInstanceName(instanceName);
- consumer.start();
-
- Counter outputCounter = getRuntimeContext().getMetricGroup()
- .counter(MetricUtils.METRICS_TPS + "_counter", new SimpleCounter());
- tpsMetric = getRuntimeContext().getMetricGroup()
- .meter(MetricUtils.METRICS_TPS, new MeterView(outputCounter, 60));
- }
-
- @Override
- public void run(SourceContext context) throws Exception {
- String tag = props.getProperty(RocketMQConfig.CONSUMER_TAG, RocketMQConfig.DEFAULT_CONSUMER_TAG);
- int pullBatchSize = getInteger(props, CONSUMER_BATCH_SIZE, DEFAULT_CONSUMER_BATCH_SIZE);
-
- final RuntimeContext ctx = getRuntimeContext();
- // The lock that guarantees that record emission and state updates are atomic,
- // from the view of taking a checkpoint.
- int taskNumber = ctx.getNumberOfParallelSubtasks();
- int taskIndex = ctx.getIndexOfThisSubtask();
- log.info("Source run, NumberOfTotalTask={}, IndexOfThisSubTask={}", taskNumber, taskIndex);
-
-
- timer.scheduleAtFixedRate(() -> {
- // context.emitWatermark(waterMarkPerQueue.getCurrentWatermark());
- context.emitWatermark(waterMarkForAll.getCurrentWatermark());
- }, 5, 5, TimeUnit.SECONDS);
-
- Collection<MessageQueue> totalQueues = consumer.fetchSubscribeMessageQueues(topic);
- messageQueues = RocketMQUtils.allocate(totalQueues, taskNumber, ctx.getIndexOfThisSubtask());
- for (MessageQueue mq : messageQueues) {
- this.executor.execute(() -> {
- RetryUtil.call(() -> {
- while (runningChecker.isRunning()) {
- try {
- long offset = getMessageQueueOffset(mq);
- PullResult pullResult = consumer.pullBlockIfNotFound(mq, tag, offset, pullBatchSize);
-
- boolean found = false;
- switch (pullResult.getPullStatus()) {
- case FOUND:
- List<MessageExt> messages = pullResult.getMsgFoundList();
- for (MessageExt msg : messages) {
- byte[] key = msg.getKeys() != null ? msg.getKeys().getBytes(StandardCharsets.UTF_8) : null;
- byte[] value = msg.getBody();
- OUT data = schema.deserializeKeyAndValue(key, value);
-
- // output and state update are atomic
- synchronized (checkPointLock) {
- log.debug(msg.getMsgId() + "_" + msg.getBrokerName() + " " + msg.getQueueId() + " " + msg.getQueueOffset());
- context.collectWithTimestamp(data, msg.getBornTimestamp());
-
- // update max eventTime per queue
- // waterMarkPerQueue.extractTimestamp(mq, msg.getBornTimestamp());
- waterMarkForAll.extractTimestamp(msg.getBornTimestamp());
- tpsMetric.markEvent();
- }
- }
- found = true;
- break;
- case NO_MATCHED_MSG:
- log.debug("No matched message after offset {} for queue {}", offset, mq);
- break;
- case NO_NEW_MSG:
- log.debug("No new message after offset {} for queue {}", offset, mq);
- break;
- case OFFSET_ILLEGAL:
- log.warn("Offset {} is illegal for queue {}", offset, mq);
- break;
- default:
- break;
- }
-
- synchronized (checkPointLock) {
- updateMessageQueueOffset(mq, pullResult.getNextBeginOffset());
- }
-
- if (!found) {
- RetryUtil.waitForMs(RocketMQConfig.DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND);
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- return true;
- }, "RuntimeException");
- });
- }
-
- awaitTermination();
- }
-
- private void awaitTermination() throws InterruptedException {
- while (runningChecker.isRunning()) {
- Thread.sleep(50);
- }
- }
-
- private long getMessageQueueOffset(MessageQueue mq) throws MQClientException {
- Long offset = offsetTable.get(mq);
- // restoredOffsets(unionOffsetStates) is the restored global union state;
- // should only snapshot mqs that actually belong to us
- if (offset == null) {
- // fetchConsumeOffset from broker
- offset = consumer.fetchConsumeOffset(mq, false);
- if (!restored || offset < 0) {
- String initialOffset = props.getProperty(RocketMQConfig.CONSUMER_OFFSET_RESET_TO, CONSUMER_OFFSET_LATEST);
- switch (initialOffset) {
- case CONSUMER_OFFSET_EARLIEST:
- offset = consumer.minOffset(mq);
- break;
- case CONSUMER_OFFSET_LATEST:
- offset = consumer.maxOffset(mq);
- break;
- case CONSUMER_OFFSET_TIMESTAMP:
- offset = consumer.searchOffset(mq, getLong(props,
- RocketMQConfig.CONSUMER_OFFSET_FROM_TIMESTAMP, System.currentTimeMillis()));
- break;
- default:
- throw new IllegalArgumentException("Unknown value for CONSUMER_OFFSET_RESET_TO.");
- }
- }
- }
- offsetTable.put(mq, offset);
- return offsetTable.get(mq);
- }
-
- private void updateMessageQueueOffset(MessageQueue mq, long offset) throws MQClientException {
- offsetTable.put(mq, offset);
- if (!enableCheckpoint) {
- consumer.updateConsumeOffset(mq, offset);
- }
- }
-
- @Override
- public void cancel() {
- log.debug("cancel ...");
- runningChecker.setRunning(false);
-
- if (consumer != null) {
- consumer.shutdown();
- }
-
- if (offsetTable != null) {
- offsetTable.clear();
- }
- if (restoredOffsets != null) {
- restoredOffsets.clear();
- }
- if (pendingOffsetsToCommit != null) {
- pendingOffsetsToCommit.clear();
- }
- }
-
- @Override
- public void close() throws Exception {
- log.debug("close ...");
- // pretty much the same logic as cancelling
- try {
- cancel();
- } finally {
- super.close();
- }
- }
-
- public void initOffsetTableFromRestoredOffsets() {
- Preconditions.checkNotNull(restoredOffsets, "restoredOffsets can't be null");
- restoredOffsets.forEach((mq, offset) -> {
- if (!offsetTable.containsKey(mq) || offsetTable.get(mq) < offset) {
- offsetTable.put(mq, offset);
- }
- });
- log.info("init offset table from restoredOffsets successful.", offsetTable);
- }
-
- @Override
- public void snapshotState(FunctionSnapshotContext context) throws Exception {
- // called when a snapshot for a checkpoint is requested
- log.info("Snapshotting state {} ...", context.getCheckpointId());
- if (!runningChecker.isRunning()) {
- log.info("snapshotState() called on closed source; returning null.");
- return;
- }
-
- // Discovery topic Route change when snapshot
- RetryUtil.call(() -> {
- Collection<MessageQueue> totalQueues = consumer.fetchSubscribeMessageQueues(topic);
- int taskNumber = getRuntimeContext().getNumberOfParallelSubtasks();
- int taskIndex = getRuntimeContext().getIndexOfThisSubtask();
- List<MessageQueue> newQueues = RocketMQUtils.allocate(totalQueues, taskNumber, taskIndex);
- Collections.sort(newQueues);
- log.debug(taskIndex + " Topic route is same.");
- if (!messageQueues.equals(newQueues)) {
- throw new RuntimeException();
- }
- return true;
- }, "RuntimeException due to topic route changed");
-
- unionOffsetStates.clear();
- HashMap<MessageQueue, Long> currentOffsets = new HashMap<>(offsetTable.size());
- for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
- unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue()));
- currentOffsets.put(entry.getKey(), entry.getValue());
- }
- pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
- log.info("Snapshotted state, last processed offsets: {}, checkpoint id: {}, timestamp: {}",
- offsetTable, context.getCheckpointId(), context.getCheckpointTimestamp());
- }
-
- /**
- * called every time the user-defined function is initialized,
- * be that when the function is first initialized or be that
- * when the function is actually recovering from an earlier checkpoint.
- * Given this, initializeState() is not only the place where different types of state are initialized,
- * but also where state recovery logic is included.
- */
- @Override
- public void initializeState(FunctionInitializationContext context) throws Exception {
- log.info("initialize State ...");
-
- this.unionOffsetStates = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<>(
- OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint<Tuple2<MessageQueue, Long>>() {
- })));
- this.restored = context.isRestored();
-
- if (restored) {
- if (restoredOffsets == null) {
- restoredOffsets = new ConcurrentHashMap<>();
- }
- for (Tuple2<MessageQueue, Long> mqOffsets : unionOffsetStates.get()) {
- if (!restoredOffsets.containsKey(mqOffsets.f0) || restoredOffsets.get(mqOffsets.f0) < mqOffsets.f1) {
- restoredOffsets.put(mqOffsets.f0, mqOffsets.f1);
- }
- }
- log.info("Setting restore state in the consumer. Using the following offsets: {}", restoredOffsets);
- } else {
- log.info("No restore state for the consumer.");
- }
- }
-
- @Override
- public TypeInformation getProducedType() {
- return schema.getProducedType();
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) throws Exception {
- // callback when checkpoint complete
- if (!runningChecker.isRunning()) {
- log.info("notifyCheckpointComplete() called on closed source; returning null.");
- return;
- }
-
- final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
- if (posInMap == -1) {
- log.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
- return;
- }
-
- Map<MessageQueue, Long> offsets = (Map<MessageQueue, Long>) pendingOffsetsToCommit.remove(posInMap);
-
- // remove older checkpoints in map
- for (int i = 0; i < posInMap; i++) {
- pendingOffsetsToCommit.remove(0);
- }
-
- if (offsets == null || offsets.size() == 0) {
- log.debug("Checkpoint state was empty.");
- return;
- }
-
- for (Map.Entry<MessageQueue, Long> entry : offsets.entrySet()) {
- consumer.updateConsumeOffset(entry.getKey(), entry.getValue());
- }
- }
-}
diff --git a/src/main/java/org/apache/rocketmq/flink/RunningChecker.java b/src/main/java/org/apache/rocketmq/flink/RunningChecker.java
deleted file mode 100644
index b7bc2b9..0000000
--- a/src/main/java/org/apache/rocketmq/flink/RunningChecker.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.flink;
-
-import java.io.Serializable;
-
-public class RunningChecker implements Serializable {
- private volatile boolean isRunning = false;
-
- public boolean isRunning() {
- return isRunning;
- }
-
- public void setRunning(boolean running) {
- isRunning = running;
- }
-}
diff --git a/src/main/java/org/apache/rocketmq/flink/common/selector/TopicSelector.java b/src/main/java/org/apache/rocketmq/flink/common/selector/TopicSelector.java
deleted file mode 100644
index 2a347db..0000000
--- a/src/main/java/org/apache/rocketmq/flink/common/selector/TopicSelector.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.flink.common.selector;
-
-import java.io.Serializable;
-
-public interface TopicSelector<T> extends Serializable {
-
- String getTopic(T tuple);
-
- String getTag(T tuple);
-}
diff --git a/src/main/java/org/apache/rocketmq/flink/common/serialization/KeyValueDeserializationSchema.java b/src/main/java/org/apache/rocketmq/flink/common/serialization/KeyValueDeserializationSchema.java
deleted file mode 100644
index d8759f9..0000000
--- a/src/main/java/org/apache/rocketmq/flink/common/serialization/KeyValueDeserializationSchema.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.flink.common.serialization;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-
-public interface KeyValueDeserializationSchema<T> extends ResultTypeQueryable<T>, Serializable {
- T deserializeKeyAndValue(byte[] key, byte[] value);
-}
diff --git a/src/main/java/org/apache/rocketmq/flink/common/serialization/KeyValueSerializationSchema.java b/src/main/java/org/apache/rocketmq/flink/common/serialization/KeyValueSerializationSchema.java
deleted file mode 100644
index d847e8a..0000000
--- a/src/main/java/org/apache/rocketmq/flink/common/serialization/KeyValueSerializationSchema.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.flink.common.serialization;
-
-import java.io.Serializable;
-
-public interface KeyValueSerializationSchema<T> extends Serializable {
-
- byte[] serializeKey(T tuple);
-
- byte[] serializeValue(T tuple);
-}
diff --git a/src/main/java/org/apache/rocketmq/flink/common/util/MetricUtils.java b/src/main/java/org/apache/rocketmq/flink/common/util/MetricUtils.java
deleted file mode 100644
index 764d01f..0000000
--- a/src/main/java/org/apache/rocketmq/flink/common/util/MetricUtils.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.flink.common.util;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Meter;
-import org.apache.flink.metrics.MeterView;
-import org.apache.flink.metrics.SimpleCounter;
-
-/**
- * RocketMQ connector metrics.
- */
-public class MetricUtils {
-
- public static final String METRICS_TPS = "tps";
-
- private static final String METRIC_GROUP_SINK = "sink";
- private static final String METRICS_SINK_IN_TPS = "inTps";
- private static final String METRICS_SINK_OUT_TPS = "outTps";
- private static final String METRICS_SINK_OUT_BPS = "outBps";
- private static final String METRICS_SINK_OUT_Latency = "outLatency";
-
- public static Meter registerSinkInTps(RuntimeContext context) {
- Counter parserCounter = context.getMetricGroup().addGroup(METRIC_GROUP_SINK)
- .counter(METRICS_SINK_IN_TPS + "_counter", new SimpleCounter());
- return context.getMetricGroup().addGroup(METRIC_GROUP_SINK)
- .meter(METRICS_SINK_IN_TPS, new MeterView(parserCounter, 60));
- }
-
- public static Meter registerOutTps(RuntimeContext context) {
- Counter parserCounter = context.getMetricGroup().addGroup(METRIC_GROUP_SINK)
- .counter(METRICS_SINK_OUT_TPS + "_counter", new SimpleCounter());
- return context.getMetricGroup().addGroup(METRIC_GROUP_SINK)
- .meter(METRICS_SINK_OUT_TPS, new MeterView(parserCounter, 60));
- }
-
- public static Meter registerOutBps(RuntimeContext context) {
- Counter bpsCounter = context.getMetricGroup().addGroup(METRIC_GROUP_SINK)
- .counter(METRICS_SINK_OUT_BPS + "_counter", new SimpleCounter());
- return context.getMetricGroup().addGroup(METRIC_GROUP_SINK)
- .meter(METRICS_SINK_OUT_BPS, new MeterView(bpsCounter, 60));
- }
-
- public static LatencyGauge registerOutLatency(RuntimeContext context) {
- return context.getMetricGroup().addGroup(METRIC_GROUP_SINK).gauge(METRICS_SINK_OUT_Latency, new LatencyGauge());
- }
-
- public static class LatencyGauge implements Gauge<Double> {
- private double value;
-
- public void report(long timeDelta, long batchSize) {
- if (batchSize != 0) {
- this.value = (1.0 * timeDelta) / batchSize;
- }
- }
-
- @Override
- public Double getValue() {
- return value;
- }
- }
-}
diff --git a/src/main/java/org/apache/rocketmq/flink/common/util/RocketMQUtils.java b/src/main/java/org/apache/rocketmq/flink/common/util/RocketMQUtils.java
deleted file mode 100644
index fc37b04..0000000
--- a/src/main/java/org/apache/rocketmq/flink/common/util/RocketMQUtils.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.flink.common.util;
-
-import org.apache.rocketmq.client.AccessChannel;
-import org.apache.rocketmq.common.message.MessageQueue;
-
-import java.lang.management.ManagementFactory;
-import java.util.*;
-
-public final class RocketMQUtils {
-
- public static int getInteger(Properties props, String key, int defaultValue) {
- return Integer.parseInt(props.getProperty(key, String.valueOf(defaultValue)));
- }
-
- public static long getLong(Properties props, String key, long defaultValue) {
- return Long.parseLong(props.getProperty(key, String.valueOf(defaultValue)));
- }
-
- public static boolean getBoolean(Properties props, String key, boolean defaultValue) {
- return Boolean.parseBoolean(props.getProperty(key, String.valueOf(defaultValue)));
- }
-
- public static AccessChannel getAccessChannel(Properties props, String key, AccessChannel defaultValue) {
- return AccessChannel.valueOf(props.getProperty(key, String.valueOf(defaultValue)));
- }
-
- public static String getInstanceName(String... args) {
- if (null != args && args.length > 0) {
- return String.join("_", args);
- }
- return ManagementFactory.getRuntimeMXBean().getName() + "_" + System.nanoTime();
- }
-
- /**
- * Average Hashing queue algorithm
- * Refer: org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely
- */
- public static List<MessageQueue> allocate(Collection<MessageQueue> mqSet,
- int numberOfParallelTasks,
- int indexOfThisTask) {
- ArrayList<MessageQueue> mqAll = new ArrayList<>(mqSet);
- Collections.sort(mqAll);
- List<MessageQueue> result = new ArrayList<>();
- int mod = mqAll.size() % numberOfParallelTasks;
- int averageSize = mqAll.size() <= numberOfParallelTasks ? 1 : (mod > 0 && indexOfThisTask < mod ?
- mqAll.size() / numberOfParallelTasks + 1 : mqAll.size() / numberOfParallelTasks);
- int startIndex = (mod > 0 && indexOfThisTask < mod) ? indexOfThisTask * averageSize :
- indexOfThisTask * averageSize + mod;
- int range = Math.min(averageSize, mqAll.size() - startIndex);
- for (int i = 0; i < range; i++) {
- result.add(mqAll.get((startIndex + i) % mqAll.size()));
- }
- return result;
- }
-}
diff --git a/src/main/java/org/apache/rocketmq/flink/common/util/TestUtils.java b/src/main/java/org/apache/rocketmq/flink/common/util/TestUtils.java
deleted file mode 100644
index 71d1265..0000000
--- a/src/main/java/org/apache/rocketmq/flink/common/util/TestUtils.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.flink.common.util;
-
-import java.lang.reflect.Field;
-
-public class TestUtils {
- public static void setFieldValue(Object obj, String fieldName, Object value) {
- try {
- Field field = obj.getClass().getDeclaredField(fieldName);
- field.setAccessible(true);
- field.set(obj, value);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-}
diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQConfig.java
similarity index 77%
rename from src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/RocketMQConfig.java
index c1bad2d..5c19b7a 100644
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQConfig.java
@@ -1,25 +1,19 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.rocketmq.flink.legacy;
-package org.apache.rocketmq.flink;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.Validate;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
@@ -28,15 +22,16 @@ import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.Validate;
+
import java.util.Properties;
import java.util.UUID;
-import static org.apache.rocketmq.flink.common.util.RocketMQUtils.getAccessChannel;
-import static org.apache.rocketmq.flink.common.util.RocketMQUtils.getInteger;
+import static org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils.getAccessChannel;
+import static org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils.getInteger;
-/**
- * RocketMQConfig for Consumer/Producer.
- */
+/** RocketMQConfig for Consumer/Producer. */
public class RocketMQConfig {
// Server Config
public static final String NAME_SERVER_ADDR = "nameserver.address"; // Required
@@ -77,13 +72,15 @@ public class RocketMQConfig {
public static final String CONSUMER_OFFSET_TIMESTAMP = "timestamp";
public static final String CONSUMER_OFFSET_FROM_TIMESTAMP = "consumer.offset.from.timestamp";
- public static final String CONSUMER_OFFSET_PERSIST_INTERVAL = "consumer.offset.persist.interval";
+ public static final String CONSUMER_OFFSET_PERSIST_INTERVAL =
+ "consumer.offset.persist.interval";
public static final int DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL = 5000; // 5 seconds
public static final String CONSUMER_BATCH_SIZE = "consumer.batch.size";
public static final int DEFAULT_CONSUMER_BATCH_SIZE = 32;
- public static final String CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND = "consumer.delay.when.message.not.found";
+ public static final String CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND =
+ "consumer.delay.when.message.not.found";
public static final int DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND = 100;
public static final String CONSUMER_INDEX_OF_THIS_SUB_TASK = "consumer.index";
@@ -116,6 +113,7 @@ public class RocketMQConfig {
/**
* Build Producer Configs.
+ *
* @param props Properties
* @param producer DefaultMQProducer
*/
@@ -126,27 +124,32 @@ public class RocketMQConfig {
group = UUID.randomUUID().toString();
}
producer.setProducerGroup(props.getProperty(PRODUCER_GROUP, group));
- producer.setRetryTimesWhenSendFailed(getInteger(props, PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES));
- producer.setRetryTimesWhenSendAsyncFailed(getInteger(props,
- PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES));
+ producer.setRetryTimesWhenSendFailed(
+ getInteger(props, PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES));
+ producer.setRetryTimesWhenSendAsyncFailed(
+ getInteger(props, PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES));
producer.setSendMsgTimeout(getInteger(props, PRODUCER_TIMEOUT, DEFAULT_PRODUCER_TIMEOUT));
-
}
/**
* Build Consumer Configs.
+ *
* @param props Properties
* @param consumer DefaultMQPullConsumer
*/
public static void buildConsumerConfigs(Properties props, DefaultMQPullConsumer consumer) {
buildCommonConfigs(props, consumer);
consumer.setMessageModel(MessageModel.CLUSTERING);
- consumer.setPersistConsumerOffsetInterval(getInteger(props,
- CONSUMER_OFFSET_PERSIST_INTERVAL, DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL));
+ consumer.setPersistConsumerOffsetInterval(
+ getInteger(
+ props,
+ CONSUMER_OFFSET_PERSIST_INTERVAL,
+ DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL));
}
/**
* Build Common Configs.
+ *
* @param props Properties
* @param client ClientConfig
*/
@@ -154,8 +157,8 @@ public class RocketMQConfig {
String nameServers = props.getProperty(NAME_SERVER_ADDR);
Validate.notEmpty(nameServers);
client.setNamesrvAddr(nameServers);
- client.setHeartbeatBrokerInterval(getInteger(props,
- BROKER_HEART_BEAT_INTERVAL, DEFAULT_BROKER_HEART_BEAT_INTERVAL));
+ client.setHeartbeatBrokerInterval(
+ getInteger(props, BROKER_HEART_BEAT_INTERVAL, DEFAULT_BROKER_HEART_BEAT_INTERVAL));
// When using aliyun products, you need to set up channels
client.setAccessChannel((getAccessChannel(props, ACCESS_CHANNEL, DEFAULT_ACCESS_CHANNEL)));
client.setUnitName(props.getProperty(UNIT_NAME, null));
@@ -163,6 +166,7 @@ public class RocketMQConfig {
/**
* Build credentials for client.
+ *
* @param props
* @return
*/
diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSink.java
similarity index 69%
rename from src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSink.java
index 865af75..f91a684 100644
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSink.java
@@ -1,24 +1,28 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.rocketmq.flink.legacy;
-package org.apache.rocketmq.flink;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.flink.legacy.common.util.MetricUtils;
+import org.apache.rocketmq.remoting.exception.RemotingException;
-import org.apache.commons.lang.Validate;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Meter;
import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -26,15 +30,8 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.rocketmq.client.AccessChannel;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.SendCallback;
-import org.apache.rocketmq.client.producer.SendResult;
-import org.apache.rocketmq.client.producer.SendStatus;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.flink.common.util.MetricUtils;
-import org.apache.rocketmq.remoting.exception.RemotingException;
+
+import org.apache.commons.lang.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,9 +41,9 @@ import java.util.Properties;
import java.util.UUID;
/**
- * The RocketMQSink provides at-least-once reliability guarantees when
- * checkpoints are enabled and batchFlushOnCheckpoint(true) is set.
- * Otherwise, the sink reliability guarantees depends on rocketmq producer's retry policy.
+ * The RocketMQSink provides at-least-once reliability guarantees when checkpoints are enabled and
+ * batchFlushOnCheckpoint(true) is set. Otherwise, the sink reliability guarantees depends on
+ * rocketmq producer's retry policy.
*/
public class RocketMQSink<IN> extends RichSinkFunction<IN> implements CheckpointedFunction {
@@ -78,14 +75,17 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint
// with authentication hook
producer = new DefaultMQProducer(RocketMQConfig.buildAclRPCHook(props));
- producer.setInstanceName(getRuntimeContext().getIndexOfThisSubtask() + "_" + UUID.randomUUID());
+ producer.setInstanceName(
+ getRuntimeContext().getIndexOfThisSubtask() + "_" + UUID.randomUUID());
RocketMQConfig.buildProducerConfigs(props, producer);
batchList = new LinkedList<>();
- if (batchFlushOnCheckpoint && !((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled()) {
- LOG.info("Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.");
+ if (batchFlushOnCheckpoint
+ && !((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled()) {
+ LOG.info(
+ "Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.");
batchFlushOnCheckpoint = false;
}
@@ -117,23 +117,25 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint
long timeStartWriting = System.currentTimeMillis();
if (async) {
try {
- producer.send(msg, new SendCallback() {
- @Override
- public void onSuccess(SendResult sendResult) {
- LOG.debug("Async send message success! result: {}", sendResult);
- long end = System.currentTimeMillis();
- latencyGauge.report(end - timeStartWriting, 1);
- outTps.markEvent();
- outBps.markEvent(msg.getBody().length);
- }
-
- @Override
- public void onException(Throwable throwable) {
- if (throwable != null) {
- LOG.error("Async send message failure!", throwable);
- }
- }
- });
+ producer.send(
+ msg,
+ new SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ LOG.debug("Async send message success! result: {}", sendResult);
+ long end = System.currentTimeMillis();
+ latencyGauge.report(end - timeStartWriting, 1);
+ outTps.markEvent();
+ outBps.markEvent(msg.getBody().length);
+ }
+
+ @Override
+ public void onException(Throwable throwable) {
+ if (throwable != null) {
+ LOG.error("Async send message failure!", throwable);
+ }
+ }
+ });
} catch (Exception e) {
LOG.error("Async send message failure!", e);
}
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSource.java
new file mode 100644
index 0000000..84260b6
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSource.java
@@ -0,0 +1,528 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.flink.legacy.common.serialization.KeyValueDeserializationSchema;
+import org.apache.rocketmq.flink.legacy.common.util.MetricUtils;
+import org.apache.rocketmq.flink.legacy.common.util.RetryUtil;
+import org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils;
+import org.apache.rocketmq.flink.legacy.common.watermark.WaterMarkForAll;
+import org.apache.rocketmq.flink.legacy.common.watermark.WaterMarkPerQueue;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.curator4.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.apache.commons.collections.map.LinkedMap;
+import org.apache.commons.lang.Validate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.management.ManagementFactory;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_BATCH_SIZE;
+import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_EARLIEST;
+import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_LATEST;
+import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_TIMESTAMP;
+import static org.apache.rocketmq.flink.legacy.RocketMQConfig.DEFAULT_CONSUMER_BATCH_SIZE;
+import static org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils.getInteger;
+import static org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils.getLong;
+
+/**
+ * The RocketMQSource is based on RocketMQ pull consumer mode, and provides exactly once reliability
+ * guarantees when checkpoints are enabled. Otherwise, the source doesn't provide any reliability
+ * guarantees.
+ */
+public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
+ implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<OUT> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger log = LoggerFactory.getLogger(RocketMQSource.class);
+ private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";
+ private RunningChecker runningChecker;
+ private transient DefaultMQPullConsumer consumer;
+ private KeyValueDeserializationSchema<OUT> schema;
+ private transient ListState<Tuple2<MessageQueue, Long>> unionOffsetStates;
+ private Map<MessageQueue, Long> offsetTable;
+ private Map<MessageQueue, Long> restoredOffsets;
+ private List<MessageQueue> messageQueues;
+ private ExecutorService executor;
+
+ // watermark in source
+ private WaterMarkPerQueue waterMarkPerQueue;
+ private WaterMarkForAll waterMarkForAll;
+
+ private ScheduledExecutorService timer;
+ /** Data for pending but uncommitted offsets. */
+ private LinkedMap pendingOffsetsToCommit;
+
+ private Properties props;
+ private String topic;
+ private String group;
+ private transient volatile boolean restored;
+ private transient boolean enableCheckpoint;
+ private volatile Object checkPointLock;
+
+ private Meter tpsMetric;
+
+ public RocketMQSource(KeyValueDeserializationSchema<OUT> schema, Properties props) {
+ this.schema = schema;
+ this.props = props;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ log.debug("source open....");
+ Validate.notEmpty(props, "Consumer properties can not be empty");
+
+ this.topic = props.getProperty(RocketMQConfig.CONSUMER_TOPIC);
+ this.group = props.getProperty(RocketMQConfig.CONSUMER_GROUP);
+
+ Validate.notEmpty(topic, "Consumer topic can not be empty");
+ Validate.notEmpty(group, "Consumer group can not be empty");
+
+ this.enableCheckpoint =
+ ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled();
+
+ if (offsetTable == null) {
+ offsetTable = new ConcurrentHashMap<>();
+ }
+ if (restoredOffsets == null) {
+ restoredOffsets = new ConcurrentHashMap<>();
+ }
+
+ // use restoredOffsets to init offset table.
+ initOffsetTableFromRestoredOffsets();
+
+ if (pendingOffsetsToCommit == null) {
+ pendingOffsetsToCommit = new LinkedMap();
+ }
+ if (checkPointLock == null) {
+ checkPointLock = new ReentrantLock();
+ }
+ if (waterMarkPerQueue == null) {
+ waterMarkPerQueue = new WaterMarkPerQueue(5000);
+ }
+ if (waterMarkForAll == null) {
+ waterMarkForAll = new WaterMarkForAll(5000);
+ }
+ if (timer == null) {
+ timer = Executors.newSingleThreadScheduledExecutor();
+ }
+
+ runningChecker = new RunningChecker();
+ runningChecker.setRunning(true);
+
+ final ThreadFactory threadFactory =
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("rmq-pull-thread-%d")
+ .build();
+ executor = Executors.newCachedThreadPool(threadFactory);
+
+ int indexOfThisSubTask = getRuntimeContext().getIndexOfThisSubtask();
+ consumer = new DefaultMQPullConsumer(group, RocketMQConfig.buildAclRPCHook(props));
+ RocketMQConfig.buildConsumerConfigs(props, consumer);
+
+ // set unique instance name, avoid exception:
+ // https://help.aliyun.com/document_detail/29646.html
+ String runtimeName = ManagementFactory.getRuntimeMXBean().getName();
+ String instanceName =
+ RocketMQUtils.getInstanceName(
+ runtimeName,
+ topic,
+ group,
+ String.valueOf(indexOfThisSubTask),
+ String.valueOf(System.nanoTime()));
+ consumer.setInstanceName(instanceName);
+ consumer.start();
+
+ Counter outputCounter =
+ getRuntimeContext()
+ .getMetricGroup()
+ .counter(MetricUtils.METRICS_TPS + "_counter", new SimpleCounter());
+ tpsMetric =
+ getRuntimeContext()
+ .getMetricGroup()
+ .meter(MetricUtils.METRICS_TPS, new MeterView(outputCounter, 60));
+ }
+
+ @Override
+ public void run(SourceContext context) throws Exception {
+ String tag =
+ props.getProperty(RocketMQConfig.CONSUMER_TAG, RocketMQConfig.DEFAULT_CONSUMER_TAG);
+ int pullBatchSize = getInteger(props, CONSUMER_BATCH_SIZE, DEFAULT_CONSUMER_BATCH_SIZE);
+
+ final RuntimeContext ctx = getRuntimeContext();
+ // The lock that guarantees that record emission and state updates are atomic,
+ // from the view of taking a checkpoint.
+ int taskNumber = ctx.getNumberOfParallelSubtasks();
+ int taskIndex = ctx.getIndexOfThisSubtask();
+ log.info("Source run, NumberOfTotalTask={}, IndexOfThisSubTask={}", taskNumber, taskIndex);
+
+ timer.scheduleAtFixedRate(
+ () -> {
+ // context.emitWatermark(waterMarkPerQueue.getCurrentWatermark());
+ context.emitWatermark(waterMarkForAll.getCurrentWatermark());
+ },
+ 5,
+ 5,
+ TimeUnit.SECONDS);
+
+ Collection<MessageQueue> totalQueues = consumer.fetchSubscribeMessageQueues(topic);
+ messageQueues =
+ RocketMQUtils.allocate(totalQueues, taskNumber, ctx.getIndexOfThisSubtask());
+ for (MessageQueue mq : messageQueues) {
+ this.executor.execute(
+ () -> {
+ RetryUtil.call(
+ () -> {
+ while (runningChecker.isRunning()) {
+ try {
+ long offset = getMessageQueueOffset(mq);
+ PullResult pullResult =
+ consumer.pullBlockIfNotFound(
+ mq, tag, offset, pullBatchSize);
+
+ boolean found = false;
+ switch (pullResult.getPullStatus()) {
+ case FOUND:
+ List<MessageExt> messages =
+ pullResult.getMsgFoundList();
+ for (MessageExt msg : messages) {
+ byte[] key =
+ msg.getKeys() != null
+ ? msg.getKeys()
+ .getBytes(
+ StandardCharsets
+ .UTF_8)
+ : null;
+ byte[] value = msg.getBody();
+ OUT data =
+ schema.deserializeKeyAndValue(
+ key, value);
+
+ // output and state update are atomic
+ synchronized (checkPointLock) {
+ log.debug(
+ msg.getMsgId()
+ + "_"
+ + msg.getBrokerName()
+ + " "
+ + msg.getQueueId()
+ + " "
+ + msg.getQueueOffset());
+ context.collectWithTimestamp(
+ data, msg.getBornTimestamp());
+
+ // update max eventTime per queue
+ // waterMarkPerQueue.extractTimestamp(mq, msg.getBornTimestamp());
+ waterMarkForAll.extractTimestamp(
+ msg.getBornTimestamp());
+ tpsMetric.markEvent();
+ }
+ }
+ found = true;
+ break;
+ case NO_MATCHED_MSG:
+ log.debug(
+ "No matched message after offset {} for queue {}",
+ offset,
+ mq);
+ break;
+ case NO_NEW_MSG:
+ log.debug(
+ "No new message after offset {} for queue {}",
+ offset,
+ mq);
+ break;
+ case OFFSET_ILLEGAL:
+ log.warn(
+ "Offset {} is illegal for queue {}",
+ offset,
+ mq);
+ break;
+ default:
+ break;
+ }
+
+ synchronized (checkPointLock) {
+ updateMessageQueueOffset(
+ mq, pullResult.getNextBeginOffset());
+ }
+
+ if (!found) {
+ RetryUtil.waitForMs(
+ RocketMQConfig
+ .DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return true;
+ },
+ "RuntimeException");
+ });
+ }
+
+ awaitTermination();
+ }
+
+ private void awaitTermination() throws InterruptedException {
+ while (runningChecker.isRunning()) {
+ Thread.sleep(50);
+ }
+ }
+
+ private long getMessageQueueOffset(MessageQueue mq) throws MQClientException {
+ Long offset = offsetTable.get(mq);
+ // restoredOffsets(unionOffsetStates) is the restored global union state;
+ // should only snapshot mqs that actually belong to us
+ if (offset == null) {
+ // fetchConsumeOffset from broker
+ offset = consumer.fetchConsumeOffset(mq, false);
+ if (!restored || offset < 0) {
+ String initialOffset =
+ props.getProperty(
+ RocketMQConfig.CONSUMER_OFFSET_RESET_TO, CONSUMER_OFFSET_LATEST);
+ switch (initialOffset) {
+ case CONSUMER_OFFSET_EARLIEST:
+ offset = consumer.minOffset(mq);
+ break;
+ case CONSUMER_OFFSET_LATEST:
+ offset = consumer.maxOffset(mq);
+ break;
+ case CONSUMER_OFFSET_TIMESTAMP:
+ offset =
+ consumer.searchOffset(
+ mq,
+ getLong(
+ props,
+ RocketMQConfig.CONSUMER_OFFSET_FROM_TIMESTAMP,
+ System.currentTimeMillis()));
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "Unknown value for CONSUMER_OFFSET_RESET_TO.");
+ }
+ }
+ }
+ offsetTable.put(mq, offset);
+ return offsetTable.get(mq);
+ }
+
+ private void updateMessageQueueOffset(MessageQueue mq, long offset) throws MQClientException {
+ offsetTable.put(mq, offset);
+ if (!enableCheckpoint) {
+ consumer.updateConsumeOffset(mq, offset);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ log.debug("cancel ...");
+ runningChecker.setRunning(false);
+
+ if (consumer != null) {
+ consumer.shutdown();
+ }
+
+ if (offsetTable != null) {
+ offsetTable.clear();
+ }
+ if (restoredOffsets != null) {
+ restoredOffsets.clear();
+ }
+ if (pendingOffsetsToCommit != null) {
+ pendingOffsetsToCommit.clear();
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ log.debug("close ...");
+ // pretty much the same logic as cancelling
+ try {
+ cancel();
+ } finally {
+ super.close();
+ }
+ }
+
+ public void initOffsetTableFromRestoredOffsets() {
+ Preconditions.checkNotNull(restoredOffsets, "restoredOffsets can't be null");
+ restoredOffsets.forEach(
+ (mq, offset) -> {
+ if (!offsetTable.containsKey(mq) || offsetTable.get(mq) < offset) {
+ offsetTable.put(mq, offset);
+ }
+ });
+ log.info("init offset table from restoredOffsets successful.", offsetTable);
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ // called when a snapshot for a checkpoint is requested
+ log.info("Snapshotting state {} ...", context.getCheckpointId());
+ if (!runningChecker.isRunning()) {
+ log.info("snapshotState() called on closed source; returning null.");
+ return;
+ }
+
+ // Discovery topic Route change when snapshot
+ RetryUtil.call(
+ () -> {
+ Collection<MessageQueue> totalQueues =
+ consumer.fetchSubscribeMessageQueues(topic);
+ int taskNumber = getRuntimeContext().getNumberOfParallelSubtasks();
+ int taskIndex = getRuntimeContext().getIndexOfThisSubtask();
+ List<MessageQueue> newQueues =
+ RocketMQUtils.allocate(totalQueues, taskNumber, taskIndex);
+ Collections.sort(newQueues);
+ log.debug(taskIndex + " Topic route is same.");
+ if (!messageQueues.equals(newQueues)) {
+ throw new RuntimeException();
+ }
+ return true;
+ },
+ "RuntimeException due to topic route changed");
+
+ unionOffsetStates.clear();
+ HashMap<MessageQueue, Long> currentOffsets = new HashMap<>(offsetTable.size());
+ for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
+ unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue()));
+ currentOffsets.put(entry.getKey(), entry.getValue());
+ }
+ pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
+ log.info(
+ "Snapshotted state, last processed offsets: {}, checkpoint id: {}, timestamp: {}",
+ offsetTable,
+ context.getCheckpointId(),
+ context.getCheckpointTimestamp());
+ }
+
+ /**
+ * called every time the user-defined function is initialized, be that when the function is
+ * first initialized or be that when the function is actually recovering from an earlier
+ * checkpoint. Given this, initializeState() is not only the place where different types of
+ * state are initialized, but also where state recovery logic is included.
+ */
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ log.info("initialize State ...");
+
+ this.unionOffsetStates =
+ context.getOperatorStateStore()
+ .getUnionListState(
+ new ListStateDescriptor<>(
+ OFFSETS_STATE_NAME,
+ TypeInformation.of(
+ new TypeHint<Tuple2<MessageQueue, Long>>() {})));
+ this.restored = context.isRestored();
+
+ if (restored) {
+ if (restoredOffsets == null) {
+ restoredOffsets = new ConcurrentHashMap<>();
+ }
+ for (Tuple2<MessageQueue, Long> mqOffsets : unionOffsetStates.get()) {
+ if (!restoredOffsets.containsKey(mqOffsets.f0)
+ || restoredOffsets.get(mqOffsets.f0) < mqOffsets.f1) {
+ restoredOffsets.put(mqOffsets.f0, mqOffsets.f1);
+ }
+ }
+ log.info(
+ "Setting restore state in the consumer. Using the following offsets: {}",
+ restoredOffsets);
+ } else {
+ log.info("No restore state for the consumer.");
+ }
+ }
+
+ @Override
+ public TypeInformation getProducedType() {
+ return schema.getProducedType();
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ // callback when checkpoint complete
+ if (!runningChecker.isRunning()) {
+ log.info("notifyCheckpointComplete() called on closed source; returning null.");
+ return;
+ }
+
+ final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
+ if (posInMap == -1) {
+ log.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
+ return;
+ }
+
+ Map<MessageQueue, Long> offsets =
+ (Map<MessageQueue, Long>) pendingOffsetsToCommit.remove(posInMap);
+
+ // remove older checkpoints in map
+ for (int i = 0; i < posInMap; i++) {
+ pendingOffsetsToCommit.remove(0);
+ }
+
+ if (offsets == null || offsets.size() == 0) {
+ log.debug("Checkpoint state was empty.");
+ return;
+ }
+
+ for (Map.Entry<MessageQueue, Long> entry : offsets.entrySet()) {
+ consumer.updateConsumeOffset(entry.getKey(), entry.getValue());
+ }
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/RunningChecker.java b/src/main/java/org/apache/rocketmq/flink/legacy/RunningChecker.java
new file mode 100644
index 0000000..c48361a
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RunningChecker.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy;
+
+import java.io.Serializable;
+
+public class RunningChecker implements Serializable {
+ private volatile boolean isRunning = false;
+
+ public boolean isRunning() {
+ return isRunning;
+ }
+
+ public void setRunning(boolean running) {
+ isRunning = running;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/DefaultTopicSelector.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/DefaultTopicSelector.java
new file mode 100644
index 0000000..6be5218
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/DefaultTopicSelector.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy.common.selector;
+
+public class DefaultTopicSelector<T> implements TopicSelector<T> {
+ private final String topicName;
+ private final String tagName;
+
+ public DefaultTopicSelector(final String topicName, final String tagName) {
+ this.topicName = topicName;
+ this.tagName = tagName;
+ }
+
+ public DefaultTopicSelector(final String topicName) {
+ this(topicName, "");
+ }
+
+ @Override
+ public String getTopic(T tuple) {
+ return topicName;
+ }
+
+ @Override
+ public String getTag(T tuple) {
+ return tagName;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/common/selector/SimpleTopicSelector.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/SimpleTopicSelector.java
similarity index 55%
rename from src/main/java/org/apache/rocketmq/flink/common/selector/SimpleTopicSelector.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/common/selector/SimpleTopicSelector.java
index 3ad8a03..674b5a0 100644
--- a/src/main/java/org/apache/rocketmq/flink/common/selector/SimpleTopicSelector.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/SimpleTopicSelector.java
@@ -1,31 +1,25 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package org.apache.rocketmq.flink.common.selector;
-
-import java.util.Map;
+package org.apache.rocketmq.flink.legacy.common.selector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * Uses field name to select topic and tag name from tuple.
- */
+import java.util.Map;
+
+/** Uses field name to select topic and tag name from tuple. */
public class SimpleTopicSelector implements TopicSelector<Map> {
private static final Logger LOG = LoggerFactory.getLogger(SimpleTopicSelector.class);
@@ -37,12 +31,17 @@ public class SimpleTopicSelector implements TopicSelector<Map> {
/**
* SimpleTopicSelector Constructor.
+ *
* @param topicFieldName field name used for selecting topic
* @param defaultTopicName default field name used for selecting topic
* @param tagFieldName field name used for selecting tag
* @param defaultTagName default field name used for selecting tag
*/
- public SimpleTopicSelector(String topicFieldName, String defaultTopicName, String tagFieldName, String defaultTagName) {
+ public SimpleTopicSelector(
+ String topicFieldName,
+ String defaultTopicName,
+ String tagFieldName,
+ String defaultTagName) {
this.topicFieldName = topicFieldName;
this.defaultTopicName = defaultTopicName;
this.tagFieldName = tagFieldName;
@@ -52,10 +51,13 @@ public class SimpleTopicSelector implements TopicSelector<Map> {
@Override
public String getTopic(Map tuple) {
if (tuple.containsKey(topicFieldName)) {
- Object topic = tuple.get(topicFieldName);
+ Object topic = tuple.get(topicFieldName);
return topic != null ? topic.toString() : defaultTopicName;
} else {
- LOG.warn("Field {} Not Found. Returning default topic {}", topicFieldName, defaultTopicName);
+ LOG.warn(
+ "Field {} Not Found. Returning default topic {}",
+ topicFieldName,
+ defaultTopicName);
return defaultTopicName;
}
}
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/TopicSelector.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/TopicSelector.java
new file mode 100644
index 0000000..581dadc
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/TopicSelector.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy.common.selector;
+
+import java.io.Serializable;
+
+public interface TopicSelector<T> extends Serializable {
+
+ String getTopic(T tuple);
+
+ String getTag(T tuple);
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/common/serialization/ForwardMessageExtDeserialization.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/ForwardMessageExtDeserialization.java
similarity index 84%
rename from src/main/java/org/apache/rocketmq/flink/common/serialization/ForwardMessageExtDeserialization.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/ForwardMessageExtDeserialization.java
index 20dd700..6c6fa74 100644
--- a/src/main/java/org/apache/rocketmq/flink/common/serialization/ForwardMessageExtDeserialization.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/ForwardMessageExtDeserialization.java
@@ -15,15 +15,15 @@
* limitations under the License.
*/
-package org.apache.rocketmq.flink.common.serialization;
+package org.apache.rocketmq.flink.legacy.common.serialization;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.rocketmq.common.message.MessageExt;
-/**
- * A Forward messageExt deserialization.
- */
-public class ForwardMessageExtDeserialization implements MessageExtDeserializationScheme<MessageExt> {
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+/** A Forward messageExt deserialization. */
+public class ForwardMessageExtDeserialization
+ implements MessageExtDeserializationScheme<MessageExt> {
@Override
public MessageExt deserializeMessageExt(MessageExt messageExt) {
@@ -34,4 +34,4 @@ public class ForwardMessageExtDeserialization implements MessageExtDeserializati
public TypeInformation<MessageExt> getProducedType() {
return TypeInformation.of(MessageExt.class);
}
-}
\ No newline at end of file
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/KeyValueDeserializationSchema.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/KeyValueDeserializationSchema.java
new file mode 100644
index 0000000..4cc8c61
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/KeyValueDeserializationSchema.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy.common.serialization;
+
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+
+import java.io.Serializable;
+
+public interface KeyValueDeserializationSchema<T> extends ResultTypeQueryable<T>, Serializable {
+ T deserializeKeyAndValue(byte[] key, byte[] value);
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/KeyValueSerializationSchema.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/KeyValueSerializationSchema.java
new file mode 100644
index 0000000..66b2e29
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/KeyValueSerializationSchema.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy.common.serialization;
+
+import java.io.Serializable;
+
+public interface KeyValueSerializationSchema<T> extends Serializable {
+
+ byte[] serializeKey(T tuple);
+
+ byte[] serializeValue(T tuple);
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/common/serialization/MessageExtDeserializationScheme.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/MessageExtDeserializationScheme.java
similarity index 95%
rename from src/main/java/org/apache/rocketmq/flink/common/serialization/MessageExtDeserializationScheme.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/MessageExtDeserializationScheme.java
index 4c8cf85..173823e 100644
--- a/src/main/java/org/apache/rocketmq/flink/common/serialization/MessageExtDeserializationScheme.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/MessageExtDeserializationScheme.java
@@ -15,12 +15,14 @@
* limitations under the License.
*/
-package org.apache.rocketmq.flink.common.serialization;
+package org.apache.rocketmq.flink.legacy.common.serialization;
-import java.io.Serializable;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+
+import java.io.Serializable;
+
/**
* The interface Message ext deserialization scheme.
*
diff --git a/src/main/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueDeserializationSchema.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueDeserializationSchema.java
similarity index 60%
rename from src/main/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueDeserializationSchema.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueDeserializationSchema.java
index 93d5d9b..7dada93 100644
--- a/src/main/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueDeserializationSchema.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueDeserializationSchema.java
@@ -1,31 +1,25 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.rocketmq.flink.legacy.common.serialization;
-package org.apache.rocketmq.flink.common.serialization;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-
public class SimpleKeyValueDeserializationSchema implements KeyValueDeserializationSchema<Map> {
public static final String DEFAULT_KEY_FIELD = "key";
public static final String DEFAULT_VALUE_FIELD = "value";
@@ -39,8 +33,9 @@ public class SimpleKeyValueDeserializationSchema implements KeyValueDeserializat
/**
* SimpleKeyValueDeserializationSchema Constructor.
+ *
* @param keyField tuple field for selecting the key
- * @param valueField tuple field for selecting the value
+ * @param valueField tuple field for selecting the value
*/
public SimpleKeyValueDeserializationSchema(String keyField, String valueField) {
this.keyField = keyField;
@@ -65,4 +60,4 @@ public class SimpleKeyValueDeserializationSchema implements KeyValueDeserializat
public TypeInformation<Map> getProducedType() {
return TypeInformation.of(Map.class);
}
-}
\ No newline at end of file
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueSerializationSchema.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchema.java
similarity index 60%
rename from src/main/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueSerializationSchema.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchema.java
index bbd6da3..3e92ad2 100644
--- a/src/main/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueSerializationSchema.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchema.java
@@ -1,22 +1,18 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package org.apache.rocketmq.flink.common.serialization;
+package org.apache.rocketmq.flink.legacy.common.serialization;
import java.nio.charset.StandardCharsets;
import java.util.Map;
@@ -34,8 +30,9 @@ public class SimpleKeyValueSerializationSchema implements KeyValueSerializationS
/**
* SimpleKeyValueSerializationSchema Constructor.
+ *
* @param keyField tuple field for selecting the key
- * @param valueField tuple field for selecting the value
+ * @param valueField tuple field for selecting the value
*/
public SimpleKeyValueSerializationSchema(String keyField, String valueField) {
this.keyField = keyField;
@@ -59,5 +56,4 @@ public class SimpleKeyValueSerializationSchema implements KeyValueSerializationS
Object value = tuple.get(valueField);
return value != null ? value.toString().getBytes(StandardCharsets.UTF_8) : null;
}
-
}
diff --git a/src/main/java/org/apache/rocketmq/flink/common/serialization/SimpleTupleDeserializationSchema.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleTupleDeserializationSchema.java
similarity index 71%
rename from src/main/java/org/apache/rocketmq/flink/common/serialization/SimpleTupleDeserializationSchema.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleTupleDeserializationSchema.java
index 54106ef..3bac266 100644
--- a/src/main/java/org/apache/rocketmq/flink/common/serialization/SimpleTupleDeserializationSchema.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleTupleDeserializationSchema.java
@@ -1,4 +1,4 @@
-package org.apache.rocketmq.flink.common.serialization;
+package org.apache.rocketmq.flink.legacy.common.serialization;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -6,7 +6,8 @@ import org.apache.flink.api.java.tuple.Tuple2;
import java.nio.charset.StandardCharsets;
-public class SimpleTupleDeserializationSchema implements KeyValueDeserializationSchema<Tuple2<String, String>> {
+public class SimpleTupleDeserializationSchema
+ implements KeyValueDeserializationSchema<Tuple2<String, String>> {
@Override
public Tuple2<String, String> deserializeKeyAndValue(byte[] key, byte[] value) {
@@ -17,6 +18,6 @@ public class SimpleTupleDeserializationSchema implements KeyValueDeserialization
@Override
public TypeInformation<Tuple2<String, String>> getProducedType() {
- return TypeInformation.of(new TypeHint<Tuple2<String,String>>(){});
+ return TypeInformation.of(new TypeHint<Tuple2<String, String>>() {});
}
}
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/MetricUtils.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/MetricUtils.java
new file mode 100644
index 0000000..bb3baeb
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/MetricUtils.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy.common.util;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.SimpleCounter;
+
+/** RocketMQ connector metrics. */
+public class MetricUtils {
+
+ public static final String METRICS_TPS = "tps";
+
+ private static final String METRIC_GROUP_SINK = "sink";
+ private static final String METRICS_SINK_IN_TPS = "inTps";
+ private static final String METRICS_SINK_OUT_TPS = "outTps";
+ private static final String METRICS_SINK_OUT_BPS = "outBps";
+ private static final String METRICS_SINK_OUT_Latency = "outLatency";
+
+ public static Meter registerSinkInTps(RuntimeContext context) {
+ Counter parserCounter =
+ context.getMetricGroup()
+ .addGroup(METRIC_GROUP_SINK)
+ .counter(METRICS_SINK_IN_TPS + "_counter", new SimpleCounter());
+ return context.getMetricGroup()
+ .addGroup(METRIC_GROUP_SINK)
+ .meter(METRICS_SINK_IN_TPS, new MeterView(parserCounter, 60));
+ }
+
+ public static Meter registerOutTps(RuntimeContext context) {
+ Counter parserCounter =
+ context.getMetricGroup()
+ .addGroup(METRIC_GROUP_SINK)
+ .counter(METRICS_SINK_OUT_TPS + "_counter", new SimpleCounter());
+ return context.getMetricGroup()
+ .addGroup(METRIC_GROUP_SINK)
+ .meter(METRICS_SINK_OUT_TPS, new MeterView(parserCounter, 60));
+ }
+
+ public static Meter registerOutBps(RuntimeContext context) {
+ Counter bpsCounter =
+ context.getMetricGroup()
+ .addGroup(METRIC_GROUP_SINK)
+ .counter(METRICS_SINK_OUT_BPS + "_counter", new SimpleCounter());
+ return context.getMetricGroup()
+ .addGroup(METRIC_GROUP_SINK)
+ .meter(METRICS_SINK_OUT_BPS, new MeterView(bpsCounter, 60));
+ }
+
+ public static LatencyGauge registerOutLatency(RuntimeContext context) {
+ return context.getMetricGroup()
+ .addGroup(METRIC_GROUP_SINK)
+ .gauge(METRICS_SINK_OUT_Latency, new LatencyGauge());
+ }
+
+ public static class LatencyGauge implements Gauge<Double> {
+ private double value;
+
+ public void report(long timeDelta, long batchSize) {
+ if (batchSize != 0) {
+ this.value = (1.0 * timeDelta) / batchSize;
+ }
+ }
+
+ @Override
+ public Double getValue() {
+ return value;
+ }
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/common/util/RetryUtil.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtil.java
similarity index 59%
rename from src/main/java/org/apache/rocketmq/flink/common/util/RetryUtil.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtil.java
index 0dbd553..7ec1dca 100644
--- a/src/main/java/org/apache/rocketmq/flink/common/util/RetryUtil.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtil.java
@@ -1,22 +1,18 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package org.apache.rocketmq.flink.common.util;
+package org.apache.rocketmq.flink.legacy.common.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,8 +26,7 @@ public class RetryUtil {
private static final long MAX_BACKOFF = 5000;
private static final int MAX_ATTEMPTS = 5;
- private RetryUtil() {
- }
+ private RetryUtil() {}
public static void waitForMs(long sleepMs) {
try {
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RocketMQUtils.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RocketMQUtils.java
new file mode 100644
index 0000000..94a24a1
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RocketMQUtils.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy.common.util;
+
+import org.apache.rocketmq.client.AccessChannel;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+public final class RocketMQUtils {
+
+ public static int getInteger(Properties props, String key, int defaultValue) {
+ return Integer.parseInt(props.getProperty(key, String.valueOf(defaultValue)));
+ }
+
+ public static long getLong(Properties props, String key, long defaultValue) {
+ return Long.parseLong(props.getProperty(key, String.valueOf(defaultValue)));
+ }
+
+ public static boolean getBoolean(Properties props, String key, boolean defaultValue) {
+ return Boolean.parseBoolean(props.getProperty(key, String.valueOf(defaultValue)));
+ }
+
+ public static AccessChannel getAccessChannel(
+ Properties props, String key, AccessChannel defaultValue) {
+ return AccessChannel.valueOf(props.getProperty(key, String.valueOf(defaultValue)));
+ }
+
+ public static String getInstanceName(String... args) {
+ if (null != args && args.length > 0) {
+ return String.join("_", args);
+ }
+ return ManagementFactory.getRuntimeMXBean().getName() + "_" + System.nanoTime();
+ }
+
+ /**
+ * Average Hashing queue algorithm Refer:
+ * org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely
+ */
+ public static List<MessageQueue> allocate(
+ Collection<MessageQueue> mqSet, int numberOfParallelTasks, int indexOfThisTask) {
+ ArrayList<MessageQueue> mqAll = new ArrayList<>(mqSet);
+ Collections.sort(mqAll);
+ List<MessageQueue> result = new ArrayList<>();
+ int mod = mqAll.size() % numberOfParallelTasks;
+ int averageSize =
+ mqAll.size() <= numberOfParallelTasks
+ ? 1
+ : (mod > 0 && indexOfThisTask < mod
+ ? mqAll.size() / numberOfParallelTasks + 1
+ : mqAll.size() / numberOfParallelTasks);
+ int startIndex =
+ (mod > 0 && indexOfThisTask < mod)
+ ? indexOfThisTask * averageSize
+ : indexOfThisTask * averageSize + mod;
+ int range = Math.min(averageSize, mqAll.size() - startIndex);
+ for (int i = 0; i < range; i++) {
+ result.add(mqAll.get((startIndex + i) % mqAll.size()));
+ }
+ return result;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/TestUtils.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/TestUtils.java
new file mode 100644
index 0000000..407aec7
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/TestUtils.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy.common.util;
+
+import java.lang.reflect.Field;
+
+public class TestUtils {
+ public static void setFieldValue(Object obj, String fieldName, Object value) {
+ try {
+ Field field = obj.getClass().getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(obj, value);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/common/watermark/BoundedOutOfOrdernessGenerator.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/BoundedOutOfOrdernessGenerator.java
similarity index 84%
rename from src/main/java/org/apache/rocketmq/flink/common/watermark/BoundedOutOfOrdernessGenerator.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/BoundedOutOfOrdernessGenerator.java
index 7e38f27..2b56f54 100644
--- a/src/main/java/org/apache/rocketmq/flink/common/watermark/BoundedOutOfOrdernessGenerator.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/BoundedOutOfOrdernessGenerator.java
@@ -15,11 +15,12 @@
* limitations under the License.
*/
-package org.apache.rocketmq.flink.common.watermark;
+package org.apache.rocketmq.flink.legacy.common.watermark;
+
+import org.apache.rocketmq.common.message.MessageExt;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.rocketmq.common.message.MessageExt;
public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MessageExt> {
@@ -27,8 +28,7 @@ public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWater
private long currentMaxTimestamp;
- public BoundedOutOfOrdernessGenerator() {
- }
+ public BoundedOutOfOrdernessGenerator() {}
public BoundedOutOfOrdernessGenerator(long maxOutOfOrderness) {
this.maxOutOfOrderness = maxOutOfOrderness;
@@ -49,9 +49,11 @@ public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWater
@Override
public String toString() {
- return "BoundedOutOfOrdernessGenerator{" +
- "maxOutOfOrderness=" + maxOutOfOrderness +
- ", currentMaxTimestamp=" + currentMaxTimestamp +
- '}';
+ return "BoundedOutOfOrdernessGenerator{"
+ + "maxOutOfOrderness="
+ + maxOutOfOrderness
+ + ", currentMaxTimestamp="
+ + currentMaxTimestamp
+ + '}';
}
}
diff --git a/src/main/java/org/apache/rocketmq/flink/common/watermark/BoundedOutOfOrdernessGeneratorPerQueue.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/BoundedOutOfOrdernessGeneratorPerQueue.java
similarity index 79%
rename from src/main/java/org/apache/rocketmq/flink/common/watermark/BoundedOutOfOrdernessGeneratorPerQueue.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/BoundedOutOfOrdernessGeneratorPerQueue.java
index e56b34c..ab49131 100644
--- a/src/main/java/org/apache/rocketmq/flink/common/watermark/BoundedOutOfOrdernessGeneratorPerQueue.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/BoundedOutOfOrdernessGeneratorPerQueue.java
@@ -15,27 +15,24 @@
* limitations under the License.
*/
-package org.apache.rocketmq.flink.common.watermark;
+package org.apache.rocketmq.flink.legacy.common.watermark;
+
+import org.apache.rocketmq.common.message.MessageExt;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
-import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-/**
- * 取每条队列中的最大eventTime的最小值作为当前source的watermark
- */
-public class BoundedOutOfOrdernessGeneratorPerQueue implements AssignerWithPeriodicWatermarks<MessageExt> {
+/** 取每条队列中的最大eventTime的最小值作为当前source的watermark */
+public class BoundedOutOfOrdernessGeneratorPerQueue
+ implements AssignerWithPeriodicWatermarks<MessageExt> {
private Map<String, Long> maxEventTimeTable;
private long maxOutOfOrderness = 5000L; // 5 seconds
- public BoundedOutOfOrdernessGeneratorPerQueue() {
- }
+ public BoundedOutOfOrdernessGeneratorPerQueue() {}
public BoundedOutOfOrdernessGeneratorPerQueue(long maxOutOfOrderness) {
this.maxOutOfOrderness = maxOutOfOrderness;
@@ -63,9 +60,11 @@ public class BoundedOutOfOrdernessGeneratorPerQueue implements AssignerWithPerio
@Override
public String toString() {
- return "BoundedOutOfOrdernessGeneratorPerQueue{" +
- "maxEventTimeTable=" + maxEventTimeTable +
- ", maxOutOfOrderness=" + maxOutOfOrderness +
- '}';
+ return "BoundedOutOfOrdernessGeneratorPerQueue{"
+ + "maxEventTimeTable="
+ + maxEventTimeTable
+ + ", maxOutOfOrderness="
+ + maxOutOfOrderness
+ + '}';
}
}
diff --git a/src/main/java/org/apache/rocketmq/flink/common/watermark/PunctuatedAssigner.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/PunctuatedAssigner.java
similarity index 64%
rename from src/main/java/org/apache/rocketmq/flink/common/watermark/PunctuatedAssigner.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/PunctuatedAssigner.java
index 354eecc..946e873 100644
--- a/src/main/java/org/apache/rocketmq/flink/common/watermark/PunctuatedAssigner.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/PunctuatedAssigner.java
@@ -15,23 +15,24 @@
* limitations under the License.
*/
-package org.apache.rocketmq.flink.common.watermark;
+package org.apache.rocketmq.flink.legacy.common.watermark;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.flink.legacy.RocketMQConfig;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.flink.RocketMQConfig;
/**
- * With Punctuated Watermarks
- * To generate watermarks whenever a certain event indicates that a new watermark might be generated, use
- * AssignerWithPunctuatedWatermarks. For this class Flink will first call the extractTimestamp(...) method to assign the
- * element a timestamp, and then immediately call the checkAndGetNextWatermark(...) method on that element.
+ * With Punctuated Watermarks To generate watermarks whenever a certain event indicates that a new
+ * watermark might be generated, use AssignerWithPunctuatedWatermarks. For this class Flink will
+ * first call the extractTimestamp(...) method to assign the element a timestamp, and then
+ * immediately call the checkAndGetNextWatermark(...) method on that element.
*
- * The checkAndGetNextWatermark(...) method is passed the timestamp that was assigned in the extractTimestamp(...)
- * method, and can decide whether it wants to generate a watermark. Whenever the checkAndGetNextWatermark(...) method
- * returns a non-null watermark, and that watermark is larger than the latest previous watermark, that new watermark
- * will be emitted.
+ * <p>The checkAndGetNextWatermark(...) method is passed the timestamp that was assigned in the
+ * extractTimestamp(...) method, and can decide whether it wants to generate a watermark. Whenever
+ * the checkAndGetNextWatermark(...) method returns a non-null watermark, and that watermark is
+ * larger than the latest previous watermark, that new watermark will be emitted.
*/
public class PunctuatedAssigner implements AssignerWithPunctuatedWatermarks<MessageExt> {
@Override
diff --git a/src/main/java/org/apache/rocketmq/flink/common/watermark/TimeLagWatermarkGenerator.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/TimeLagWatermarkGenerator.java
similarity index 82%
rename from src/main/java/org/apache/rocketmq/flink/common/watermark/TimeLagWatermarkGenerator.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/TimeLagWatermarkGenerator.java
index beec8f3..66cb3cd 100644
--- a/src/main/java/org/apache/rocketmq/flink/common/watermark/TimeLagWatermarkGenerator.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/TimeLagWatermarkGenerator.java
@@ -15,21 +15,21 @@
* limitations under the License.
*/
-package org.apache.rocketmq.flink.common.watermark;
+package org.apache.rocketmq.flink.legacy.common.watermark;
+
+import org.apache.rocketmq.common.message.MessageExt;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.rocketmq.common.message.MessageExt;
/**
- * This generator generates watermarks that are lagging behind processing time by a certain amount. It assumes that
- * elements arrive in Flink after at most a certain time.
+ * This generator generates watermarks that are lagging behind processing time by a certain amount.
+ * It assumes that elements arrive in Flink after at most a certain time.
*/
public class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks<MessageExt> {
private long maxTimeLag = 5000; // 5 seconds
- TimeLagWatermarkGenerator() {
- }
+ TimeLagWatermarkGenerator() {}
TimeLagWatermarkGenerator(long maxTimeLag) {
this.maxTimeLag = maxTimeLag;
@@ -46,9 +46,8 @@ public class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks
return new Watermark(System.currentTimeMillis() - maxTimeLag);
}
- @Override public String toString() {
- return "TimeLagWatermarkGenerator{" +
- "maxTimeLag=" + maxTimeLag +
- '}';
+ @Override
+ public String toString() {
+ return "TimeLagWatermarkGenerator{" + "maxTimeLag=" + maxTimeLag + '}';
}
}
diff --git a/src/main/java/org/apache/rocketmq/flink/common/watermark/WaterMarkForAll.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/WaterMarkForAll.java
similarity index 83%
rename from src/main/java/org/apache/rocketmq/flink/common/watermark/WaterMarkForAll.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/WaterMarkForAll.java
index a80fb69..8fadd77 100644
--- a/src/main/java/org/apache/rocketmq/flink/common/watermark/WaterMarkForAll.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/WaterMarkForAll.java
@@ -15,14 +15,9 @@
* limitations under the License.
*/
-package org.apache.rocketmq.flink.common.watermark;
+package org.apache.rocketmq.flink.legacy.common.watermark;
import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.rocketmq.common.message.MessageQueue;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
public class WaterMarkForAll {
@@ -30,8 +25,7 @@ public class WaterMarkForAll {
private long maxTimestamp = 0L;
- public WaterMarkForAll() {
- }
+ public WaterMarkForAll() {}
public WaterMarkForAll(long maxOutOfOrderness) {
this.maxOutOfOrderness = maxOutOfOrderness;
diff --git a/src/main/java/org/apache/rocketmq/flink/common/watermark/WaterMarkPerQueue.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/WaterMarkPerQueue.java
similarity index 87%
rename from src/main/java/org/apache/rocketmq/flink/common/watermark/WaterMarkPerQueue.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/WaterMarkPerQueue.java
index 2210cfb..941dec5 100644
--- a/src/main/java/org/apache/rocketmq/flink/common/watermark/WaterMarkPerQueue.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/WaterMarkPerQueue.java
@@ -15,11 +15,12 @@
* limitations under the License.
*/
-package org.apache.rocketmq.flink.common.watermark;
+package org.apache.rocketmq.flink.legacy.common.watermark;
-import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -30,8 +31,7 @@ public class WaterMarkPerQueue {
private long maxOutOfOrderness = 5000L; // 5 seconds
- public WaterMarkPerQueue() {
- }
+ public WaterMarkPerQueue() {}
public WaterMarkPerQueue(long maxOutOfOrderness) {
this.maxOutOfOrderness = maxOutOfOrderness;
@@ -54,9 +54,11 @@ public class WaterMarkPerQueue {
@Override
public String toString() {
- return "WaterMarkPerQueue{" +
- "maxEventTimeTable=" + maxEventTimeTable +
- ", maxOutOfOrderness=" + maxOutOfOrderness +
- '}';
+ return "WaterMarkPerQueue{"
+ + "maxEventTimeTable="
+ + maxEventTimeTable
+ + ", maxOutOfOrderness="
+ + maxOutOfOrderness
+ + '}';
}
}
diff --git a/src/main/java/org/apache/rocketmq/flink/example/RocketMQFlinkExample.java b/src/main/java/org/apache/rocketmq/flink/legacy/example/RocketMQFlinkExample.java
similarity index 77%
rename from src/main/java/org/apache/rocketmq/flink/example/RocketMQFlinkExample.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/example/RocketMQFlinkExample.java
index 1f24d96..b435726 100644
--- a/src/main/java/org/apache/rocketmq/flink/example/RocketMQFlinkExample.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/example/RocketMQFlinkExample.java
@@ -15,7 +15,15 @@
* limitations under the License.
*/
-package org.apache.rocketmq.flink.example;
+package org.apache.rocketmq.flink.legacy.example;
+
+import org.apache.rocketmq.client.AccessChannel;
+import org.apache.rocketmq.flink.legacy.RocketMQConfig;
+import org.apache.rocketmq.flink.legacy.RocketMQSink;
+import org.apache.rocketmq.flink.legacy.RocketMQSource;
+import org.apache.rocketmq.flink.legacy.common.serialization.SimpleTupleDeserializationSchema;
+import org.apache.rocketmq.flink.legacy.function.SinkMapFunction;
+import org.apache.rocketmq.flink.legacy.function.SourceMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
@@ -25,29 +33,23 @@ import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.rocketmq.client.AccessChannel;
-import org.apache.rocketmq.flink.RocketMQConfig;
-import org.apache.rocketmq.flink.RocketMQSink;
-import org.apache.rocketmq.flink.RocketMQSource;
-import org.apache.rocketmq.flink.common.serialization.SimpleKeyValueDeserializationSchema;
-import org.apache.rocketmq.flink.common.serialization.SimpleTupleDeserializationSchema;
-import org.apache.rocketmq.flink.function.SinkMapFunction;
-import org.apache.rocketmq.flink.function.SourceMapFunction;
import java.util.Properties;
-import static org.apache.rocketmq.flink.RocketMQConfig.CONSUMER_OFFSET_LATEST;
-import static org.apache.rocketmq.flink.RocketMQConfig.DEFAULT_CONSUMER_TAG;
+import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_LATEST;
+import static org.apache.rocketmq.flink.legacy.RocketMQConfig.DEFAULT_CONSUMER_TAG;
public class RocketMQFlinkExample {
/**
* Source Config
+ *
* @return properties
*/
private static Properties getConsumerProps() {
Properties consumerProps = new Properties();
- consumerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR,
+ consumerProps.setProperty(
+ RocketMQConfig.NAME_SERVER_ADDR,
"http://${instanceId}.${region}.mq-internal.aliyuncs.com:8080");
consumerProps.setProperty(RocketMQConfig.CONSUMER_GROUP, "${ConsumerGroup}");
consumerProps.setProperty(RocketMQConfig.CONSUMER_TOPIC, "${SourceTopic}");
@@ -61,11 +63,13 @@ public class RocketMQFlinkExample {
/**
* Sink Config
+ *
* @return properties
*/
private static Properties getProducerProps() {
Properties producerProps = new Properties();
- producerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR,
+ producerProps.setProperty(
+ RocketMQConfig.NAME_SERVER_ADDR,
"http://${instanceId}.${region}.mq-internal.aliyuncs.com:8080");
producerProps.setProperty(RocketMQConfig.PRODUCER_GROUP, "${ProducerGroup}");
producerProps.setProperty(RocketMQConfig.ACCESS_KEY, "${AccessKey}");
@@ -100,22 +104,26 @@ public class RocketMQFlinkExample {
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
- env.getCheckpointConfig().enableExternalizedCheckpoints(
- CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+ env.getCheckpointConfig()
+ .enableExternalizedCheckpoints(
+ CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
Properties consumerProps = getConsumerProps();
Properties producerProps = getProducerProps();
SimpleTupleDeserializationSchema schema = new SimpleTupleDeserializationSchema();
- DataStreamSource<Tuple2<String, String>> source = env.addSource(
- new RocketMQSource<>(schema, consumerProps)).setParallelism(2);
+ DataStreamSource<Tuple2<String, String>> source =
+ env.addSource(new RocketMQSource<>(schema, consumerProps)).setParallelism(2);
source.print();
source.process(new SourceMapFunction())
.process(new SinkMapFunction("FLINK_SINK", "*"))
- .addSink(new RocketMQSink(producerProps).withBatchFlushOnCheckpoint(true).withBatchSize(32)
- .withAsync(true))
+ .addSink(
+ new RocketMQSink(producerProps)
+ .withBatchFlushOnCheckpoint(true)
+ .withBatchSize(32)
+ .withAsync(true))
.setParallelism(2);
env.execute("rocketmq-connect-flink");
diff --git a/src/main/java/org/apache/rocketmq/flink/example/SimpleConsumer.java b/src/main/java/org/apache/rocketmq/flink/legacy/example/SimpleConsumer.java
similarity index 72%
rename from src/main/java/org/apache/rocketmq/flink/example/SimpleConsumer.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/example/SimpleConsumer.java
index 601d37d..9e025bb 100644
--- a/src/main/java/org/apache/rocketmq/flink/example/SimpleConsumer.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/example/SimpleConsumer.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.rocketmq.flink.example;
+package org.apache.rocketmq.flink.legacy.example;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
@@ -28,6 +28,7 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,7 +37,8 @@ public class SimpleConsumer {
private static final Logger log = LoggerFactory.getLogger(SimpleConsumer.class);
// Consumer config
- private static final String NAME_SERVER_ADDR = "http://${instanceId}.${region}.mq-internal.aliyuncs.com:8080";
+ private static final String NAME_SERVER_ADDR =
+ "http://${instanceId}.${region}.mq-internal.aliyuncs.com:8080";
private static final String GROUP = "GID_SIMPLE_CONSUMER";
private static final String TOPIC = "SINK_TOPIC";
private static final String TAGS = "*";
@@ -48,8 +50,9 @@ public class SimpleConsumer {
}
public static void main(String[] args) {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
- GROUP, getAclRPCHook(), new AllocateMessageQueueAveragely());
+ DefaultMQPushConsumer consumer =
+ new DefaultMQPushConsumer(
+ GROUP, getAclRPCHook(), new AllocateMessageQueueAveragely());
consumer.setNamesrvAddr(NAME_SERVER_ADDR);
// When using aliyun products, you need to set up channels
@@ -62,13 +65,19 @@ public class SimpleConsumer {
}
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
- for (MessageExt msg : msgs) {
- System.out.printf("%s %s %d %s\n", msg.getMsgId(), msg.getBrokerName(), msg.getQueueId(),
- new String(msg.getBody()));
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- });
+ consumer.registerMessageListener(
+ (MessageListenerConcurrently)
+ (msgs, context) -> {
+ for (MessageExt msg : msgs) {
+ System.out.printf(
+ "%s %s %d %s\n",
+ msg.getMsgId(),
+ msg.getBrokerName(),
+ msg.getQueueId(),
+ new String(msg.getBody()));
+ }
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ });
try {
consumer.start();
diff --git a/src/main/java/org/apache/rocketmq/flink/example/SimpleProducer.java b/src/main/java/org/apache/rocketmq/flink/legacy/example/SimpleProducer.java
similarity index 88%
rename from src/main/java/org/apache/rocketmq/flink/example/SimpleProducer.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/example/SimpleProducer.java
index 9d7ba45..ea24f60 100644
--- a/src/main/java/org/apache/rocketmq/flink/example/SimpleProducer.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/example/SimpleProducer.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.rocketmq.flink.example;
+package org.apache.rocketmq.flink.legacy.example;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
@@ -24,8 +24,8 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.flink.RocketMQSource;
import org.apache.rocketmq.remoting.RPCHook;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,7 +36,8 @@ public class SimpleProducer {
private static final int MESSAGE_NUM = 10000;
// Producer config
- private static final String NAME_SERVER_ADDR = "http://${instanceId}.${region}.mq-internal.aliyuncs.com:8080";
+ private static final String NAME_SERVER_ADDR =
+ "http://${instanceId}.${region}.mq-internal.aliyuncs.com:8080";
private static final String PRODUCER_GROUP = "GID_SIMPLE_PRODUCER";
private static final String TOPIC = "SOURCE_TOPIC";
private static final String TAGS = "*";
@@ -49,8 +50,8 @@ public class SimpleProducer {
}
public static void main(String[] args) {
- DefaultMQProducer producer = new DefaultMQProducer(
- PRODUCER_GROUP, getAclRPCHook(), true, null);
+ DefaultMQProducer producer =
+ new DefaultMQProducer(PRODUCER_GROUP, getAclRPCHook(), true, null);
producer.setNamesrvAddr(NAME_SERVER_ADDR);
// When using aliyun products, you need to set up channels
@@ -68,7 +69,8 @@ public class SimpleProducer {
try {
SendResult sendResult = producer.send(msg);
assert sendResult != null;
- System.out.printf("send result: %s %s\n",
+ System.out.printf(
+ "send result: %s %s\n",
sendResult.getMsgId(), sendResult.getMessageQueue().toString());
Thread.sleep(50);
} catch (Exception e) {
diff --git a/src/main/java/org/apache/rocketmq/flink/function/SinkMapFunction.java b/src/main/java/org/apache/rocketmq/flink/legacy/function/SinkMapFunction.java
similarity index 91%
rename from src/main/java/org/apache/rocketmq/flink/function/SinkMapFunction.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/function/SinkMapFunction.java
index c3a6af5..f63b636 100644
--- a/src/main/java/org/apache/rocketmq/flink/function/SinkMapFunction.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/function/SinkMapFunction.java
@@ -15,13 +15,15 @@
* limitations under the License.
*/
-package org.apache.rocketmq.flink.function;
+package org.apache.rocketmq.flink.legacy.function;
+
+import org.apache.rocketmq.common.message.Message;
-import org.apache.commons.lang.Validate;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
-import org.apache.rocketmq.common.message.Message;
+
+import org.apache.commons.lang.Validate;
public class SinkMapFunction extends ProcessFunction<Tuple2<String, String>, Message> {
@@ -29,8 +31,7 @@ public class SinkMapFunction extends ProcessFunction<Tuple2<String, String>, Mes
private String tag;
- public SinkMapFunction() {
- }
+ public SinkMapFunction() {}
public SinkMapFunction(String topic, String tag) {
this.topic = topic;
@@ -38,7 +39,8 @@ public class SinkMapFunction extends ProcessFunction<Tuple2<String, String>, Mes
}
@Override
- public void processElement(Tuple2<String, String> tuple, Context ctx, Collector<Message> out) throws Exception {
+ public void processElement(Tuple2<String, String> tuple, Context ctx, Collector<Message> out)
+ throws Exception {
Validate.notNull(topic, "the message topic is null");
Validate.notNull(tuple.f1.getBytes(), "the message body is null");
diff --git a/src/main/java/org/apache/rocketmq/flink/function/SourceMapFunction.java b/src/main/java/org/apache/rocketmq/flink/legacy/function/SourceMapFunction.java
similarity index 76%
rename from src/main/java/org/apache/rocketmq/flink/function/SourceMapFunction.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/function/SourceMapFunction.java
index 8dd07c6..a49df95 100644
--- a/src/main/java/org/apache/rocketmq/flink/function/SourceMapFunction.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/function/SourceMapFunction.java
@@ -15,16 +15,19 @@
* limitations under the License.
*/
-package org.apache.rocketmq.flink.function;
+package org.apache.rocketmq.flink.legacy.function;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
-public class SourceMapFunction extends ProcessFunction<Tuple2<String, String>, Tuple2<String, String>> {
+public class SourceMapFunction
+ extends ProcessFunction<Tuple2<String, String>, Tuple2<String, String>> {
@Override
- public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception {
+ public void processElement(
+ Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, String>> out)
+ throws Exception {
out.collect(new Tuple2<>(value.f0, value.f1));
}
}
diff --git a/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java
new file mode 100644
index 0000000..b899618
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source;
+
+import org.apache.rocketmq.flink.source.enumerator.RocketMQSourceEnumState;
+import org.apache.rocketmq.flink.source.enumerator.RocketMQSourceEnumStateSerializer;
+import org.apache.rocketmq.flink.source.enumerator.RocketMQSourceEnumerator;
+import org.apache.rocketmq.flink.source.reader.RocketMQPartitionSplitReader;
+import org.apache.rocketmq.flink.source.reader.RocketMQRecordEmitter;
+import org.apache.rocketmq.flink.source.reader.RocketMQSourceReader;
+import org.apache.rocketmq.flink.source.reader.deserializer.RocketMQRecordDeserializationSchema;
+import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplit;
+import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplitSerializer;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import java.util.function.Supplier;
+
+/** The Source implementation of RocketMQ. */
+public class RocketMQSource<OUT>
+ implements Source<OUT, RocketMQPartitionSplit, RocketMQSourceEnumState>,
+ ResultTypeQueryable<OUT> {
+ private static final long serialVersionUID = -6755372893283732098L;
+
+ private final String topic;
+ private final String consumerGroup;
+ private final String tag;
+ private final long stopInMs;
+ private final long startTime;
+ private final long startOffset;
+ private final long partitionDiscoveryIntervalMs;
+
+ // Boundedness
+ private final Boundedness boundedness;
+ private final RocketMQRecordDeserializationSchema<OUT> deserializationSchema;
+
+ public RocketMQSource(
+ String topic,
+ String consumerGroup,
+ String tag,
+ long stopInMs,
+ long startTime,
+ long startOffset,
+ long partitionDiscoveryIntervalMs,
+ Boundedness boundedness,
+ RocketMQRecordDeserializationSchema<OUT> deserializationSchema) {
+ this.topic = topic;
+ this.consumerGroup = consumerGroup;
+ this.tag = tag;
+ this.stopInMs = stopInMs;
+ this.startTime = startTime;
+ this.startOffset = startOffset;
+ this.partitionDiscoveryIntervalMs = partitionDiscoveryIntervalMs;
+ this.boundedness = boundedness;
+ this.deserializationSchema = deserializationSchema;
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return this.boundedness;
+ }
+
+ @Override
+ public SourceReader<OUT, RocketMQPartitionSplit> createReader(SourceReaderContext readerContext)
+ throws Exception {
+ FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple3<OUT, Long, Long>>> elementsQueue =
+ new FutureCompletingBlockingQueue<>();
+ deserializationSchema.open(
+ new DeserializationSchema.InitializationContext() {
+ @Override
+ public MetricGroup getMetricGroup() {
+ return readerContext.metricGroup();
+ }
+
+ @Override
+ public UserCodeClassLoader getUserCodeClassLoader() {
+ return null;
+ }
+ });
+
+ Supplier<SplitReader<Tuple3<OUT, Long, Long>, RocketMQPartitionSplit>> splitReaderSupplier =
+ () ->
+ new RocketMQPartitionSplitReader<>(
+ topic,
+ consumerGroup,
+ tag,
+ stopInMs,
+ startTime,
+ startOffset,
+ deserializationSchema);
+ RocketMQRecordEmitter<OUT> recordEmitter = new RocketMQRecordEmitter<>();
+
+ return new RocketMQSourceReader<>(
+ elementsQueue,
+ splitReaderSupplier,
+ recordEmitter,
+ new Configuration(),
+ readerContext);
+ }
+
+ @Override
+ public SplitEnumerator<RocketMQPartitionSplit, RocketMQSourceEnumState> createEnumerator(
+ SplitEnumeratorContext<RocketMQPartitionSplit> enumContext) {
+ return new RocketMQSourceEnumerator(
+ topic,
+ consumerGroup,
+ stopInMs,
+ startOffset,
+ partitionDiscoveryIntervalMs,
+ boundedness,
+ enumContext);
+ }
+
+ @Override
+ public SplitEnumerator<RocketMQPartitionSplit, RocketMQSourceEnumState> restoreEnumerator(
+ SplitEnumeratorContext<RocketMQPartitionSplit> enumContext,
+ RocketMQSourceEnumState checkpoint) {
+ return new RocketMQSourceEnumerator(
+ topic,
+ consumerGroup,
+ stopInMs,
+ startOffset,
+ partitionDiscoveryIntervalMs,
+ boundedness,
+ enumContext,
+ checkpoint.getCurrentAssignment());
+ }
+
+ @Override
+ public SimpleVersionedSerializer<RocketMQPartitionSplit> getSplitSerializer() {
+ return new RocketMQPartitionSplitSerializer();
+ }
+
+ @Override
+ public SimpleVersionedSerializer<RocketMQSourceEnumState> getEnumeratorCheckpointSerializer() {
+ return new RocketMQSourceEnumStateSerializer();
+ }
+
+ @Override
+ public TypeInformation<OUT> getProducedType() {
+ return deserializationSchema.getProducedType();
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/common/selector/DefaultTopicSelector.java b/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumState.java
similarity index 54%
rename from src/main/java/org/apache/rocketmq/flink/common/selector/DefaultTopicSelector.java
rename to src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumState.java
index 264d211..a23139f 100644
--- a/src/main/java/org/apache/rocketmq/flink/common/selector/DefaultTopicSelector.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumState.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,28 +16,23 @@
* limitations under the License.
*/
-package org.apache.rocketmq.flink.common.selector;
+package org.apache.rocketmq.flink.source.enumerator;
-public class DefaultTopicSelector<T> implements TopicSelector<T> {
- private final String topicName;
- private final String tagName;
+import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplit;
- public DefaultTopicSelector(final String topicName, final String tagName) {
- this.topicName = topicName;
- this.tagName = tagName;
- }
+import java.util.List;
+import java.util.Map;
- public DefaultTopicSelector(final String topicName) {
- this(topicName, "");
- }
+/** The state of RocketMQ source enumerator. */
+public class RocketMQSourceEnumState {
+
+ private final Map<Integer, List<RocketMQPartitionSplit>> currentAssignment;
- @Override
- public String getTopic(T tuple) {
- return topicName;
+ RocketMQSourceEnumState(Map<Integer, List<RocketMQPartitionSplit>> currentAssignment) {
+ this.currentAssignment = currentAssignment;
}
- @Override
- public String getTag(T tuple) {
- return tagName;
+ public Map<Integer, List<RocketMQPartitionSplit>> getCurrentAssignment() {
+ return currentAssignment;
}
}
diff --git a/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumStateSerializer.java b/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumStateSerializer.java
new file mode 100644
index 0000000..ce45b51
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumStateSerializer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.enumerator;
+
+import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplit;
+import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplitSerializer;
+
+import org.apache.flink.connector.base.source.utils.SerdeUtils;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/** The {@link SimpleVersionedSerializer Serializer} for the enumerator state of RocketMQ source. */
+public class RocketMQSourceEnumStateSerializer
+ implements SimpleVersionedSerializer<RocketMQSourceEnumState> {
+
+ private static final int CURRENT_VERSION = 0;
+
+ @Override
+ public int getVersion() {
+ return CURRENT_VERSION;
+ }
+
+ @Override
+ public byte[] serialize(RocketMQSourceEnumState enumState) throws IOException {
+ return SerdeUtils.serializeSplitAssignments(
+ enumState.getCurrentAssignment(), new RocketMQPartitionSplitSerializer());
+ }
+
+ @Override
+ public RocketMQSourceEnumState deserialize(int version, byte[] serialized) throws IOException {
+ // Check whether the version of serialized bytes is supported.
+ if (version == getVersion()) {
+ Map<Integer, List<RocketMQPartitionSplit>> currentPartitionAssignment =
+ SerdeUtils.deserializeSplitAssignments(
+ serialized, new RocketMQPartitionSplitSerializer(), ArrayList::new);
+ return new RocketMQSourceEnumState(currentPartitionAssignment);
+ }
+ throw new IOException(
+ String.format(
+ "The bytes are serialized with version %d, "
+ + "while this deserializer only supports version up to %d",
+ version, getVersion()));
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java b/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java
new file mode 100644
index 0000000..08290c6
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.enumerator;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.MQPullConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplit;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** The enumerator class for RocketMQ source. */
+@Internal
+public class RocketMQSourceEnumerator
+ implements SplitEnumerator<RocketMQPartitionSplit, RocketMQSourceEnumState> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RocketMQSourceEnumerator.class);
+
+ /** The topic used for this RocketMQSource. */
+ private final String topic;
+ /** The consumer group used for this RocketMQSource. */
+ private final String consumerGroup;
+ /** The stop timestamp for this RocketMQSource. */
+ private final long stopInMs;
+ /** The start offset for this RocketMQSource. */
+ private final long startOffset;
+ /** The partition discovery interval for this RocketMQSource. */
+ private final long partitionDiscoveryIntervalMs;
+ /** The boundedness of this RocketMQSource. */
+ private final Boundedness boundedness;
+
+ private final SplitEnumeratorContext<RocketMQPartitionSplit> context;
+
+ // The internal states of the enumerator.
+ /**
+ * This set is only accessed by the partition discovery callable in the callAsync() method, i.e
+ * worker thread.
+ */
+ private final Set<Tuple3<String, String, Integer>> discoveredPartitions;
+ /** The current assignment by reader id. Only accessed by the coordinator thread. */
+ private final Map<Integer, List<RocketMQPartitionSplit>> readerIdToSplitAssignments;
+ /**
+ * The discovered and initialized partition splits that are waiting for owner reader to be
+ * ready.
+ */
+ private final Map<Integer, Set<RocketMQPartitionSplit>> pendingPartitionSplitAssignment;
+
+ // Lazily instantiated or mutable fields.
+ private MQPullConsumer consumer;
+ private boolean noMoreNewPartitionSplits = false;
+
+ public RocketMQSourceEnumerator(
+ String topic,
+ String consumerGroup,
+ long stopInMs,
+ long startOffset,
+ long partitionDiscoveryIntervalMs,
+ Boundedness boundedness,
+ SplitEnumeratorContext<RocketMQPartitionSplit> context) {
+ this(
+ topic,
+ consumerGroup,
+ stopInMs,
+ startOffset,
+ partitionDiscoveryIntervalMs,
+ boundedness,
+ context,
+ new HashMap<>());
+ }
+
+ public RocketMQSourceEnumerator(
+ String topic,
+ String consumerGroup,
+ long stopInMs,
+ long startOffset,
+ long partitionDiscoveryIntervalMs,
+ Boundedness boundedness,
+ SplitEnumeratorContext<RocketMQPartitionSplit> context,
+ Map<Integer, List<RocketMQPartitionSplit>> currentSplitsAssignments) {
+ this.topic = topic;
+ this.consumerGroup = consumerGroup;
+ this.stopInMs = stopInMs;
+ this.startOffset = startOffset;
+ this.partitionDiscoveryIntervalMs = partitionDiscoveryIntervalMs;
+ this.boundedness = boundedness;
+ this.context = context;
+
+ this.discoveredPartitions = new HashSet<>();
+ this.readerIdToSplitAssignments = new HashMap<>(currentSplitsAssignments);
+ this.readerIdToSplitAssignments.forEach(
+ (reader, splits) ->
+ splits.forEach(
+ s ->
+ discoveredPartitions.add(
+ new Tuple3<>(
+ s.getTopic(),
+ s.getBroker(),
+ s.getPartition()))));
+ this.pendingPartitionSplitAssignment = new HashMap<>();
+ }
+
+ @Override
+ public void start() {
+ initialRocketMQConsumer();
+ LOG.info(
+ "Starting the RocketMQSourceEnumerator for consumer group {} "
+ + "with partition discovery interval of {} ms.",
+ consumerGroup,
+ partitionDiscoveryIntervalMs);
+ context.callAsync(
+ this::discoverAndInitializePartitionSplit,
+ this::handlePartitionSplitChanges,
+ 0,
+ partitionDiscoveryIntervalMs);
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
+ // the RocketMQ source pushes splits eagerly, rather than act upon split requests
+ }
+
+ @Override
+ public void addSplitsBack(List<RocketMQPartitionSplit> splits, int subtaskId) {
+ addPartitionSplitChangeToPendingAssignments(splits);
+ assignPendingPartitionSplits();
+ }
+
+ @Override
+ public void addReader(int subtaskId) {
+ LOG.debug(
+ "Adding reader {} to RocketMQSourceEnumerator for consumer group {}.",
+ subtaskId,
+ consumerGroup);
+ assignPendingPartitionSplits();
+ if (boundedness == Boundedness.BOUNDED) {
+ // for RocketMQ bounded source, send this signal to ensure the task can end after all
+ // the
+ // splits assigned are completed.
+ context.signalNoMoreSplits(subtaskId);
+ }
+ }
+
+ @Override
+ public RocketMQSourceEnumState snapshotState() {
+ return new RocketMQSourceEnumState(readerIdToSplitAssignments);
+ }
+
+ @Override
+ public void close() {
+ if (consumer != null) {
+ consumer.shutdown();
+ }
+ }
+
+ // ----------------- private methods -------------------
+
+ private Set<RocketMQPartitionSplit> discoverAndInitializePartitionSplit()
+ throws MQClientException {
+ Set<Tuple3<String, String, Integer>> newPartitions = new HashSet<>();
+ Set<Tuple3<String, String, Integer>> removedPartitions =
+ new HashSet<>(Collections.unmodifiableSet(discoveredPartitions));
+ Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues(topic);
+ for (MessageQueue messageQueue : messageQueues) {
+ Tuple3<String, String, Integer> topicPartition =
+ new Tuple3<>(
+ messageQueue.getTopic(),
+ messageQueue.getBrokerName(),
+ messageQueue.getQueueId());
+ if (!removedPartitions.remove(topicPartition)) {
+ newPartitions.add(topicPartition);
+ }
+ }
+ discoveredPartitions.addAll(Collections.unmodifiableSet(newPartitions));
+ return newPartitions.stream()
+ .map(
+ messageQueue ->
+ new RocketMQPartitionSplit(
+ messageQueue.f0,
+ messageQueue.f1,
+ messageQueue.f2,
+ startOffset,
+ stopInMs))
+ .collect(Collectors.toSet());
+ }
+
+ // This method should only be invoked in the coordinator executor thread.
+ private void handlePartitionSplitChanges(
+ Set<RocketMQPartitionSplit> partitionSplits, Throwable t) {
+ if (t != null) {
+ throw new FlinkRuntimeException("Failed to handle partition splits change due to ", t);
+ }
+ if (partitionDiscoveryIntervalMs < 0) {
+ LOG.debug("");
+ noMoreNewPartitionSplits = true;
+ }
+ addPartitionSplitChangeToPendingAssignments(partitionSplits);
+ assignPendingPartitionSplits();
+ }
+
+ // This method should only be invoked in the coordinator executor thread.
+ private void addPartitionSplitChangeToPendingAssignments(
+ Collection<RocketMQPartitionSplit> newPartitionSplits) {
+ int numReaders = context.currentParallelism();
+ for (RocketMQPartitionSplit split : newPartitionSplits) {
+ int ownerReader =
+ getSplitOwner(
+ split.getTopic(), split.getBroker(), split.getPartition(), numReaders);
+ pendingPartitionSplitAssignment
+ .computeIfAbsent(ownerReader, r -> new HashSet<>())
+ .add(split);
+ }
+ LOG.debug(
+ "Assigned {} to {} readers of consumer group {}.",
+ newPartitionSplits,
+ numReaders,
+ consumerGroup);
+ }
+
+ // This method should only be invoked in the coordinator executor thread.
+ private void assignPendingPartitionSplits() {
+ Map<Integer, List<RocketMQPartitionSplit>> incrementalAssignment = new HashMap<>();
+ pendingPartitionSplitAssignment.forEach(
+ (ownerReader, pendingSplits) -> {
+ if (!pendingSplits.isEmpty()
+ && context.registeredReaders().containsKey(ownerReader)) {
+ // The owner reader is ready, assign the split to the owner reader.
+ incrementalAssignment
+ .computeIfAbsent(ownerReader, r -> new ArrayList<>())
+ .addAll(pendingSplits);
+ }
+ });
+ if (incrementalAssignment.isEmpty()) {
+ // No assignment is made.
+ return;
+ }
+
+ LOG.info("Assigning splits to readers {}", incrementalAssignment);
+ context.assignSplits(new SplitsAssignment<>(incrementalAssignment));
+ incrementalAssignment.forEach(
+ (readerOwner, newPartitionSplits) -> {
+ // Update the split assignment.
+ readerIdToSplitAssignments
+ .computeIfAbsent(readerOwner, r -> new ArrayList<>())
+ .addAll(newPartitionSplits);
+ // Clear the pending splits for the reader owner.
+ pendingPartitionSplitAssignment.remove(readerOwner);
+ // Sends NoMoreSplitsEvent to the readers if there is no more partition splits
+ // to be assigned.
+ if (noMoreNewPartitionSplits) {
+ LOG.debug(
+ "No more RocketMQPartitionSplits to assign. Sending NoMoreSplitsEvent to the readers "
+ + "in consumer group {}.",
+ consumerGroup);
+ context.signalNoMoreSplits(readerOwner);
+ }
+ });
+ }
+
+ private void initialRocketMQConsumer() {
+ try {
+ consumer = new DefaultMQPullConsumer(consumerGroup);
+ consumer.start();
+ } catch (MQClientException e) {
+ LOG.error("Failed to initial RocketMQ consumer.", e);
+ consumer.shutdown();
+ }
+ }
+
+ /**
+ * Returns the index of the target subtask that a specific RocketMQ partition should be assigned
+ * to.
+ *
+ * <p>The resulting distribution of partitions of a single topic has the following contract:
+ *
+ * <ul>
+ * <li>1. Uniformly distributed across subtasks
+ * <li>2. Partitions are round-robin distributed (strictly clockwise w.r.t. ascending subtask
+ * indices) by using the partition id as the offset from a starting index (i.e., the index
+ * of the subtask which partition 0 of the topic will be assigned to, determined using the
+ * topic name).
+ * </ul>
+ *
+ * @param topic the RocketMQ topic assigned.
+ * @param broker the RocketMQ broker assigned.
+ * @param partition the RocketMQ partition to assign.
+ * @param numReaders the total number of readers.
+ * @return the id of the subtask that owns the split.
+ */
+ @VisibleForTesting
+ static int getSplitOwner(String topic, String broker, int partition, int numReaders) {
+ int startIndex = (((topic + "-" + broker).hashCode() * 31) & 0x7FFFFFFF) % numReaders;
+
+ // here, the assumption is that the id of RocketMQ partitions are always ascending
+ // starting from 0, and therefore can be used directly as the offset clockwise from the
+ // start index
+ return (startIndex + partition) % numReaders;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
new file mode 100644
index 0000000..3bbeec8
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.reader;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.MQPullConsumer;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.flink.source.reader.deserializer.RocketMQRecordDeserializationSchema;
+import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplit;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.rocketmq.client.consumer.PullStatus.FOUND;
+
+/**
+ * A {@link SplitReader} implementation that reads records from RocketMQ partitions.
+ *
+ * <p>The returned type are in the format of {@code tuple3(record, offset and timestamp}.
+ */
+public class RocketMQPartitionSplitReader<T>
+ implements SplitReader<Tuple3<T, Long, Long>, RocketMQPartitionSplit> {
+ private static final Logger LOG = LoggerFactory.getLogger(RocketMQPartitionSplitReader.class);
+
+ private final String topic;
+ private final String tag;
+ private final long stopInMs;
+ private final long startTime;
+ private final long startOffset;
+
+ private final RocketMQRecordDeserializationSchema<T> deserializationSchema;
+ private final Map<Tuple3<String, String, Integer>, Long> startingOffsets;
+ private final Map<Tuple3<String, String, Integer>, Long> stoppingTimestamps;
+ private final SimpleCollector<T> collector;
+
+ private MQPullConsumer consumer;
+
+ private volatile boolean wakeup = false;
+
+ private static final int MAX_MESSAGE_NUMBER_PER_BLOCK = 64;
+
+ public RocketMQPartitionSplitReader(
+ String topic,
+ String consumerGroup,
+ String tag,
+ long stopInMs,
+ long startTime,
+ long startOffset,
+ RocketMQRecordDeserializationSchema<T> deserializationSchema) {
+ this.topic = topic;
+ this.tag = tag;
+ this.stopInMs = stopInMs;
+ this.startTime = startTime;
+ this.startOffset = startOffset;
+ this.deserializationSchema = deserializationSchema;
+ this.startingOffsets = new HashMap<>();
+ this.stoppingTimestamps = new HashMap<>();
+ this.collector = new SimpleCollector<>();
+ initialRocketMQConsumer(consumerGroup);
+ }
+
+ @Override
+ public RecordsWithSplitIds<Tuple3<T, Long, Long>> fetch() throws IOException {
+ RocketMQPartitionSplitRecords<Tuple3<T, Long, Long>> recordsBySplits =
+ new RocketMQPartitionSplitRecords<>();
+ Set<MessageQueue> messageQueues;
+ try {
+ messageQueues = consumer.fetchSubscribeMessageQueues(topic);
+ } catch (MQClientException e) {
+ LOG.error(
+ String.format(
+ "Fetch RocketMQ subscribe message queues of topic[%s] exception.",
+ topic),
+ e);
+ recordsBySplits.prepareForRead();
+ return recordsBySplits;
+ }
+ for (MessageQueue messageQueue : messageQueues) {
+ Tuple3<String, String, Integer> topicPartition =
+ new Tuple3<>(
+ messageQueue.getTopic(),
+ messageQueue.getBrokerName(),
+ messageQueue.getQueueId());
+ if (startingOffsets.containsKey(topicPartition)) {
+ long messageOffset = startingOffsets.get(topicPartition);
+ if (messageOffset == 0) {
+ try {
+ messageOffset =
+ startTime > 0
+ ? consumer.searchOffset(messageQueue, startTime)
+ : startOffset;
+ } catch (MQClientException e) {
+ LOG.error(
+ String.format(
+ "Search RocketMQ message offset of topic[%s] broker[%s] queue[%d] exception.",
+ messageQueue.getTopic(),
+ messageQueue.getBrokerName(),
+ messageQueue.getQueueId()),
+ e);
+ }
+ messageOffset = messageOffset > -1 ? messageOffset : 0;
+ }
+ PullResult pullResult = null;
+ try {
+ if (wakeup) {
+ LOG.info(
+ String.format(
+ "Wake up pulling messages of topic[%s] broker[%s] queue[%d] tag[%s] from offset[%d].",
+ messageQueue.getTopic(),
+ messageQueue.getBrokerName(),
+ messageQueue.getQueueId(),
+ tag,
+ messageOffset));
+ wakeup = false;
+ recordsBySplits.prepareForRead();
+ return recordsBySplits;
+ }
+ pullResult =
+ consumer.pullBlockIfNotFound(
+ messageQueue, tag, messageOffset, MAX_MESSAGE_NUMBER_PER_BLOCK);
+ } catch (MQClientException
+ | RemotingException
+ | MQBrokerException
+ | InterruptedException e) {
+ LOG.error(
+ String.format(
+ "Pull RocketMQ messages of topic[%s] broker[%s] queue[%d] tag[%s] from offset[%d] exception.",
+ messageQueue.getTopic(),
+ messageQueue.getBrokerName(),
+ messageQueue.getQueueId(),
+ tag,
+ messageOffset),
+ e);
+ }
+ startingOffsets.put(
+ topicPartition,
+ pullResult == null ? messageOffset : pullResult.getNextBeginOffset());
+ if (pullResult != null && pullResult.getPullStatus() == FOUND) {
+ Collection<Tuple3<T, Long, Long>> recordsForSplit =
+ recordsBySplits.recordsForSplit(
+ messageQueue.getTopic()
+ + "-"
+ + messageQueue.getBrokerName()
+ + "-"
+ + messageQueue.getQueueId());
+ for (MessageExt messageExt : pullResult.getMsgFoundList()) {
+ long stoppingTimestamp = getStoppingTimestamp(topicPartition);
+ long storeTimestamp = messageExt.getStoreTimestamp();
+ if (storeTimestamp > stoppingTimestamp) {
+ finishSplitAtRecord(
+ topicPartition,
+ stoppingTimestamp,
+ messageExt.getQueueOffset(),
+ recordsBySplits);
+ break;
+ }
+ // Add the record to the partition collector.
+ try {
+ deserializationSchema.deserialize(
+ Collections.singletonList(messageExt), collector);
+ collector
+ .getRecords()
+ .forEach(
+ r ->
+ recordsForSplit.add(
+ new Tuple3<>(
+ r,
+ messageExt.getQueueOffset(),
+ messageExt
+ .getStoreTimestamp())));
+ } catch (Exception e) {
+ throw new IOException(
+ "Failed to deserialize consumer record due to", e);
+ } finally {
+ collector.reset();
+ }
+ }
+ }
+ }
+ }
+ recordsBySplits.prepareForRead();
+ return recordsBySplits;
+ }
+
+ @Override
+ public void handleSplitsChanges(SplitsChange<RocketMQPartitionSplit> splitsChange) {
+ // Get all the partition assignments and stopping timestamps..
+ if (!(splitsChange instanceof SplitsAddition)) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "The SplitChange type of %s is not supported.",
+ splitsChange.getClass()));
+ }
+ // Setup the stopping timestamps.
+ splitsChange
+ .splits()
+ .forEach(
+ split -> {
+ Tuple3<String, String, Integer> topicPartition =
+ new Tuple3<>(
+ split.getTopic(),
+ split.getBroker(),
+ split.getPartition());
+ startingOffsets.put(topicPartition, split.getStartingOffset());
+ stoppingTimestamps.put(topicPartition, split.getStoppingTimestamp());
+ });
+ }
+
+ @Override
+ public void wakeUp() {
+ LOG.debug("Wake up the split reader in case the fetcher thread is blocking in fetch().");
+ wakeup = true;
+ }
+
+ @Override
+ public void close() {
+ consumer.shutdown();
+ }
+
+ private void finishSplitAtRecord(
+ Tuple3<String, String, Integer> topicPartition,
+ long stoppingTimestamp,
+ long currentOffset,
+ RocketMQPartitionSplitRecords<Tuple3<T, Long, Long>> recordsBySplits) {
+ LOG.debug(
+ "{} has reached stopping timestamp {}, current offset is {}",
+ topicPartition.f0 + "-" + topicPartition.f1,
+ stoppingTimestamp,
+ currentOffset);
+ recordsBySplits.addFinishedSplit(RocketMQPartitionSplit.toSplitId(topicPartition));
+ startingOffsets.remove(topicPartition);
+ stoppingTimestamps.remove(topicPartition);
+ }
+
+ private long getStoppingTimestamp(Tuple3<String, String, Integer> topicPartition) {
+ return stoppingTimestamps.getOrDefault(topicPartition, stopInMs);
+ }
+
+ // --------------- private helper method ----------------------
+
+ private void initialRocketMQConsumer(String consumerGroup) {
+ try {
+ consumer = new DefaultMQPullConsumer(consumerGroup);
+ consumer.start();
+ } catch (MQClientException e) {
+ LOG.error("Failed to initial RocketMQ consumer.", e);
+ consumer.shutdown();
+ }
+ }
+
+ // ---------------- private helper class ------------------------
+
+ private static class RocketMQPartitionSplitRecords<T> implements RecordsWithSplitIds<T> {
+ private final Map<String, Collection<T>> recordsBySplits;
+ private final Set<String> finishedSplits;
+ private Iterator<Map.Entry<String, Collection<T>>> splitIterator;
+ private String currentSplitId;
+ private Iterator<T> recordIterator;
+
+ public RocketMQPartitionSplitRecords() {
+ this.recordsBySplits = new HashMap<>();
+ this.finishedSplits = new HashSet<>();
+ }
+
+ private Collection<T> recordsForSplit(String splitId) {
+ return recordsBySplits.computeIfAbsent(splitId, id -> new ArrayList<>());
+ }
+
+ private void addFinishedSplit(String splitId) {
+ finishedSplits.add(splitId);
+ }
+
+ private void prepareForRead() {
+ splitIterator = recordsBySplits.entrySet().iterator();
+ }
+
+ @Override
+ @Nullable
+ public String nextSplit() {
+ if (splitIterator.hasNext()) {
+ Map.Entry<String, Collection<T>> entry = splitIterator.next();
+ currentSplitId = entry.getKey();
+ recordIterator = entry.getValue().iterator();
+ return currentSplitId;
+ } else {
+ currentSplitId = null;
+ recordIterator = null;
+ return null;
+ }
+ }
+
+ @Override
+ @Nullable
+ public T nextRecordFromSplit() {
+ Preconditions.checkNotNull(
+ currentSplitId,
+ "Make sure nextSplit() did not return null before "
+ + "iterate over the records split.");
+ if (recordIterator.hasNext()) {
+ return recordIterator.next();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public Set<String> finishedSplits() {
+ return finishedSplits;
+ }
+ }
+
+ private static class SimpleCollector<T> implements Collector<T> {
+ private final List<T> records = new ArrayList<>();
+
+ @Override
+ public void collect(T record) {
+ records.add(record);
+ }
+
+ @Override
+ public void close() {}
+
+ private List<T> getRecords() {
+ return records;
+ }
+
+ private void reset() {
+ records.clear();
+ }
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQRecordEmitter.java b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQRecordEmitter.java
new file mode 100644
index 0000000..25270b5
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQRecordEmitter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.reader;
+
+import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplitState;
+
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+
+/** The {@link RecordEmitter} implementation for {@link RocketMQSourceReader}. */
+public class RocketMQRecordEmitter<T>
+ implements RecordEmitter<Tuple3<T, Long, Long>, T, RocketMQPartitionSplitState> {
+
+ @Override
+ public void emitRecord(
+ Tuple3<T, Long, Long> element,
+ SourceOutput<T> output,
+ RocketMQPartitionSplitState splitState) {
+ output.collect(element.f0, element.f2);
+ splitState.setCurrentOffset(element.f1 + 1);
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQSourceReader.java b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQSourceReader.java
new file mode 100644
index 0000000..0257e34
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQSourceReader.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.reader;
+
+import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplit;
+import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplitState;
+
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+
+import java.util.Map;
+import java.util.function.Supplier;
+
+/** The source reader for RocketMQ partitions. */
+public class RocketMQSourceReader<T>
+ extends SingleThreadMultiplexSourceReaderBase<
+ Tuple3<T, Long, Long>, T, RocketMQPartitionSplit, RocketMQPartitionSplitState> {
+
+ public RocketMQSourceReader(
+ FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple3<T, Long, Long>>> elementsQueue,
+ Supplier<SplitReader<Tuple3<T, Long, Long>, RocketMQPartitionSplit>>
+ splitReaderSupplier,
+ RecordEmitter<Tuple3<T, Long, Long>, T, RocketMQPartitionSplitState> recordEmitter,
+ Configuration config,
+ SourceReaderContext context) {
+ super(elementsQueue, splitReaderSupplier, recordEmitter, config, context);
+ }
+
+ @Override
+ protected void onSplitFinished(Map<String, RocketMQPartitionSplitState> map) {}
+
+ @Override
+ protected RocketMQPartitionSplitState initializedState(RocketMQPartitionSplit partitionSplit) {
+ return new RocketMQPartitionSplitState(partitionSplit);
+ }
+
+ @Override
+ protected RocketMQPartitionSplit toSplitType(
+ String splitId, RocketMQPartitionSplitState splitState) {
+ return splitState.toRocketMQPartitionSplit();
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRecordDeserializationSchema.java b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRecordDeserializationSchema.java
new file mode 100644
index 0000000..455f8af
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRecordDeserializationSchema.java
@@ -0,0 +1,43 @@
+package org.apache.rocketmq.flink.source.reader.deserializer;
+
+import org.apache.rocketmq.common.message.MessageExt;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema.InitializationContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/** An interface for the deserialization of RocketMQ records. */
+public interface RocketMQRecordDeserializationSchema<T>
+ extends Serializable, ResultTypeQueryable<T> {
+
+ /**
+ * Initialization method for the schema. It is called before the actual working methods {@link
+ * #deserialize} and thus suitable for one time setup work.
+ *
+ * <p>The provided {@link InitializationContext} can be used to access additional features such
+ * as e.g. registering user metrics.
+ *
+ * @param context Contextual information that can be used during initialization.
+ */
+ @PublicEvolving
+ default void open(InitializationContext context) throws Exception {}
+
+ /**
+ * Deserializes the byte message.
+ *
+ * <p>Can output multiple records through the {@link Collector}. Note that number and size of
+ * the produced records should be relatively small. Depending on the source implementation
+ * records can be buffered in memory or collecting records might delay emitting checkpoint
+ * barrier.
+ *
+ * @param record The MessageExts to deserialize.
+ * @param out The collector to put the resulting messages.
+ */
+ @PublicEvolving
+ void deserialize(List<MessageExt> record, Collector<T> out) throws IOException;
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/source/split/RocketMQPartitionSplit.java b/src/main/java/org/apache/rocketmq/flink/source/split/RocketMQPartitionSplit.java
new file mode 100644
index 0000000..9bda60f
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/source/split/RocketMQPartitionSplit.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.split;
+
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+import java.util.Objects;
+
+/** A {@link SourceSplit} for a RocketMQ partition. */
+public class RocketMQPartitionSplit implements SourceSplit {
+
+ private final String topic;
+ private final String broker;
+ private final int partition;
+ private final long startingOffset;
+ private final long stoppingTimestamp;
+
+ public RocketMQPartitionSplit(
+ String topic,
+ String broker,
+ int partition,
+ long startingOffset,
+ long stoppingTimestamp) {
+ this.topic = topic;
+ this.broker = broker;
+ this.partition = partition;
+ this.startingOffset = startingOffset;
+ this.stoppingTimestamp = stoppingTimestamp;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public String getBroker() {
+ return broker;
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+
+ public long getStartingOffset() {
+ return startingOffset;
+ }
+
+ public long getStoppingTimestamp() {
+ return stoppingTimestamp;
+ }
+
+ @Override
+ public String splitId() {
+ return topic + "-" + broker + "-" + partition;
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "[Topic: %s, Partition: %s, StartingOffset: %d, StoppingTimestamp: %d]",
+ topic, partition, startingOffset, stoppingTimestamp);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(topic, partition, startingOffset, stoppingTimestamp);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof RocketMQPartitionSplit)) {
+ return false;
+ }
+ RocketMQPartitionSplit other = (RocketMQPartitionSplit) obj;
+ return topic.equals(other.topic)
+ && partition == other.partition
+ && startingOffset == other.startingOffset
+ && stoppingTimestamp == other.stoppingTimestamp;
+ }
+
+ public static String toSplitId(Tuple3<String, String, Integer> topicPartition) {
+ return topicPartition.f0 + "-" + topicPartition.f1 + "-" + topicPartition.f2;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/source/split/RocketMQPartitionSplitSerializer.java b/src/main/java/org/apache/rocketmq/flink/source/split/RocketMQPartitionSplitSerializer.java
new file mode 100644
index 0000000..2363257
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/source/split/RocketMQPartitionSplitSerializer.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.split;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/** The {@link SimpleVersionedSerializer serializer} for {@link RocketMQPartitionSplit}. */
+public class RocketMQPartitionSplitSerializer
+ implements SimpleVersionedSerializer<RocketMQPartitionSplit> {
+
+ private static final int CURRENT_VERSION = 0;
+
+ @Override
+ public int getVersion() {
+ return CURRENT_VERSION;
+ }
+
+ @Override
+ public byte[] serialize(RocketMQPartitionSplit split) throws IOException {
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(baos)) {
+ out.writeUTF(split.getTopic());
+ out.writeUTF(split.getBroker());
+ out.writeInt(split.getPartition());
+ out.writeLong(split.getStartingOffset());
+ out.writeLong(split.getStoppingTimestamp());
+ out.flush();
+ return baos.toByteArray();
+ }
+ }
+
+ @Override
+ public RocketMQPartitionSplit deserialize(int version, byte[] serialized) throws IOException {
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+ DataInputStream in = new DataInputStream(bais)) {
+ String topic = in.readUTF();
+ String broker = in.readUTF();
+ int partition = in.readInt();
+ long offset = in.readLong();
+ long timestamp = in.readLong();
+ return new RocketMQPartitionSplit(topic, broker, partition, offset, timestamp);
+ }
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/source/split/RocketMQPartitionSplitState.java b/src/main/java/org/apache/rocketmq/flink/source/split/RocketMQPartitionSplitState.java
new file mode 100644
index 0000000..4fbb3da
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/source/split/RocketMQPartitionSplitState.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.split;
+
+/** This class extends RocketMQPartitionSplit to track a mutable current offset. */
+public class RocketMQPartitionSplitState extends RocketMQPartitionSplit {
+
+ private long currentOffset;
+
+ public RocketMQPartitionSplitState(RocketMQPartitionSplit partitionSplit) {
+ super(
+ partitionSplit.getTopic(),
+ partitionSplit.getBroker(),
+ partitionSplit.getPartition(),
+ partitionSplit.getStartingOffset(),
+ partitionSplit.getStoppingTimestamp());
+ this.currentOffset = partitionSplit.getStartingOffset();
+ }
+
+ public long getCurrentOffset() {
+ return currentOffset;
+ }
+
+ public void setCurrentOffset(long currentOffset) {
+ this.currentOffset = currentOffset;
+ }
+
+ /**
+ * Use the current offset as the starting offset to create a new RocketMQPartitionSplit.
+ *
+ * @return a new RocketMQPartitionSplit which uses the current offset as its starting offset.
+ */
+ public RocketMQPartitionSplit toRocketMQPartitionSplit() {
+ return new RocketMQPartitionSplit(
+ getTopic(),
+ getBroker(),
+ getPartition(),
+ getCurrentOffset(),
+ getStoppingTimestamp());
+ }
+}
diff --git a/src/test/java/org/apache/rocketmq/flink/RocketMQSinkTest.java b/src/test/java/org/apache/rocketmq/flink/RocketMQSinkTest.java
deleted file mode 100644
index 6738ec3..0000000
--- a/src/test/java/org/apache/rocketmq/flink/RocketMQSinkTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.flink;
-
-import java.util.Properties;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.flink.common.selector.DefaultTopicSelector;
-import org.apache.rocketmq.flink.common.selector.TopicSelector;
-import org.apache.rocketmq.flink.common.serialization.KeyValueSerializationSchema;
-import org.apache.rocketmq.flink.common.serialization.SimpleKeyValueSerializationSchema;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import static org.apache.rocketmq.flink.common.util.TestUtils.setFieldValue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-
-@Ignore
-public class RocketMQSinkTest {
-
- private RocketMQSink rocketMQSink;
- private DefaultMQProducer producer;
-
- @Before
- public void setUp() throws Exception {
- KeyValueSerializationSchema serializationSchema = new SimpleKeyValueSerializationSchema("id", "name");
- TopicSelector topicSelector = new DefaultTopicSelector("tpc");
- Properties props = new Properties();
- props.setProperty(RocketMQConfig.MSG_DELAY_LEVEL, String.valueOf(RocketMQConfig.MSG_DELAY_LEVEL04));
- rocketMQSink = new RocketMQSink(props);
-
- producer = mock(DefaultMQProducer.class);
- setFieldValue(rocketMQSink, "producer", producer);
- }
-
- @Test
- public void testSink() throws Exception {
- Tuple2<String, String> tuple = new Tuple2<>("id", "province");
- String topic = "testTopic";
- String tag = "testTag";
- Message message = new Message(topic, tag, tuple.f0, tuple.f1.getBytes());
- }
-
- @Test
- public void close() throws Exception {
- rocketMQSink.close();
-
- verify(producer).shutdown();
- }
-
-}
\ No newline at end of file
diff --git a/src/test/java/org/apache/rocketmq/flink/common/selector/DefaultTopicSelectorTest.java b/src/test/java/org/apache/rocketmq/flink/common/selector/DefaultTopicSelectorTest.java
deleted file mode 100644
index 2f4685c..0000000
--- a/src/test/java/org/apache/rocketmq/flink/common/selector/DefaultTopicSelectorTest.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.flink.common.selector;
-
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-public class DefaultTopicSelectorTest {
- @Test
- public void getTopic() throws Exception {
- DefaultTopicSelector selector = new DefaultTopicSelector("rocket");
- assertEquals("rocket", selector.getTopic(null));
- assertEquals("", selector.getTag(null));
-
- selector = new DefaultTopicSelector("rocket", "tg");
- assertEquals("rocket", selector.getTopic(null));
- assertEquals("tg", selector.getTag(null));
- }
-
-}
\ No newline at end of file
diff --git a/src/test/java/org/apache/rocketmq/flink/common/selector/SimpleTopicSelectorTest.java b/src/test/java/org/apache/rocketmq/flink/common/selector/SimpleTopicSelectorTest.java
deleted file mode 100644
index 6ac1a57..0000000
--- a/src/test/java/org/apache/rocketmq/flink/common/selector/SimpleTopicSelectorTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.flink.common.selector;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-public class SimpleTopicSelectorTest {
- @Test
- public void getTopic() throws Exception {
- SimpleTopicSelector selector = new SimpleTopicSelector("tpc", "dtpc", "tg", "dtg");
- Map tuple = new HashMap();
- tuple.put("id", "x001");
- tuple.put("name", "vesense");
- tuple.put("tpc", "tpc1");
- tuple.put("tg", "tg1");
-
- assertEquals("tpc1", selector.getTopic(tuple));
- assertEquals("tg1", selector.getTag(tuple));
-
- tuple = new HashMap();
- tuple.put("id", "x001");
- tuple.put("name", "vesense");
-
- assertEquals("dtpc", selector.getTopic(tuple));
- assertEquals("dtg", selector.getTag(tuple));
- }
-
-}
\ No newline at end of file
diff --git a/src/test/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueSerializationSchemaTest.java b/src/test/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueSerializationSchemaTest.java
deleted file mode 100644
index 98aa793..0000000
--- a/src/test/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueSerializationSchemaTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.flink.common.serialization;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-public class SimpleKeyValueSerializationSchemaTest {
- @Test
- public void serializeKeyAndValue() throws Exception {
- SimpleKeyValueSerializationSchema serializationSchema = new SimpleKeyValueSerializationSchema("id", "name");
- SimpleKeyValueDeserializationSchema deserializationSchema = new SimpleKeyValueDeserializationSchema("id", "name");
-
- Map tuple = new HashMap();
- tuple.put("id", "x001");
- tuple.put("name", "vesense");
-
- assertEquals(tuple, deserializationSchema.deserializeKeyAndValue(serializationSchema.serializeKey(tuple),
- serializationSchema.serializeValue(tuple)));
- }
-
-}
\ No newline at end of file
diff --git a/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSinkTest.java b/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSinkTest.java
new file mode 100644
index 0000000..c45dbdf
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSinkTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy;
+
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.flink.legacy.common.selector.DefaultTopicSelector;
+import org.apache.rocketmq.flink.legacy.common.selector.TopicSelector;
+import org.apache.rocketmq.flink.legacy.common.serialization.KeyValueSerializationSchema;
+import org.apache.rocketmq.flink.legacy.common.serialization.SimpleKeyValueSerializationSchema;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.Properties;
+
+import static org.apache.rocketmq.flink.legacy.common.util.TestUtils.setFieldValue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+@Ignore
+public class RocketMQSinkTest {
+
+ private RocketMQSink rocketMQSink;
+ private DefaultMQProducer producer;
+
+ @Before
+ public void setUp() throws Exception {
+ KeyValueSerializationSchema serializationSchema =
+ new SimpleKeyValueSerializationSchema("id", "name");
+ TopicSelector topicSelector = new DefaultTopicSelector("tpc");
+ Properties props = new Properties();
+ props.setProperty(
+ RocketMQConfig.MSG_DELAY_LEVEL, String.valueOf(RocketMQConfig.MSG_DELAY_LEVEL04));
+ rocketMQSink = new RocketMQSink(props);
+
+ producer = mock(DefaultMQProducer.class);
+ setFieldValue(rocketMQSink, "producer", producer);
+ }
+
+ @Test
+ public void testSink() throws Exception {
+ Tuple2<String, String> tuple = new Tuple2<>("id", "province");
+ String topic = "testTopic";
+ String tag = "testTag";
+ Message message = new Message(topic, tag, tuple.f0, tuple.f1.getBytes());
+ }
+
+ @Test
+ public void close() throws Exception {
+ rocketMQSink.close();
+
+ verify(producer).shutdown();
+ }
+}
diff --git a/src/test/java/org/apache/rocketmq/flink/RocketMQSourceTest.java b/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSourceTest.java
similarity index 77%
rename from src/test/java/org/apache/rocketmq/flink/RocketMQSourceTest.java
rename to src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSourceTest.java
index 2f16a96..a863ddd 100644
--- a/src/test/java/org/apache/rocketmq/flink/RocketMQSourceTest.java
+++ b/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSourceTest.java
@@ -1,43 +1,42 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.rocketmq.flink.legacy;
-package org.apache.rocketmq.flink;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.flink.common.serialization.KeyValueDeserializationSchema;
-import org.apache.rocketmq.flink.common.serialization.SimpleKeyValueDeserializationSchema;
+import org.apache.rocketmq.flink.legacy.common.serialization.KeyValueDeserializationSchema;
+import org.apache.rocketmq.flink.legacy.common.serialization.SimpleKeyValueDeserializationSchema;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
-import static org.apache.rocketmq.flink.common.util.TestUtils.setFieldValue;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.rocketmq.flink.legacy.common.util.TestUtils.setFieldValue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
@@ -87,7 +86,8 @@ public class RocketMQSourceTest {
PullResult pullResult = new PullResult(PullStatus.FOUND, 3, 1, 5, msgFoundList);
when(consumer.fetchConsumeOffset(any(MessageQueue.class), anyBoolean())).thenReturn(2L);
- when(consumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt())).thenReturn(pullResult);
+ when(consumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt()))
+ .thenReturn(pullResult);
SourceContext context = mock(SourceContext.class);
when(context.getCheckpointLock()).thenReturn(new Object());
@@ -118,4 +118,4 @@ public class RocketMQSourceTest {
return false;
}
}
-}
\ No newline at end of file
+}
diff --git a/src/test/java/org/apache/rocketmq/flink/legacy/common/selector/DefaultTopicSelectorTest.java b/src/test/java/org/apache/rocketmq/flink/legacy/common/selector/DefaultTopicSelectorTest.java
new file mode 100644
index 0000000..b235c63
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/flink/legacy/common/selector/DefaultTopicSelectorTest.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy.common.selector;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class DefaultTopicSelectorTest {
+ @Test
+ public void getTopic() throws Exception {
+ DefaultTopicSelector selector = new DefaultTopicSelector("rocket");
+ assertEquals("rocket", selector.getTopic(null));
+ assertEquals("", selector.getTag(null));
+
+ selector = new DefaultTopicSelector("rocket", "tg");
+ assertEquals("rocket", selector.getTopic(null));
+ assertEquals("tg", selector.getTag(null));
+ }
+}
diff --git a/src/test/java/org/apache/rocketmq/flink/legacy/common/selector/SimpleTopicSelectorTest.java b/src/test/java/org/apache/rocketmq/flink/legacy/common/selector/SimpleTopicSelectorTest.java
new file mode 100644
index 0000000..5c0f755
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/flink/legacy/common/selector/SimpleTopicSelectorTest.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy.common.selector;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class SimpleTopicSelectorTest {
+ @Test
+ public void getTopic() throws Exception {
+ SimpleTopicSelector selector = new SimpleTopicSelector("tpc", "dtpc", "tg", "dtg");
+ Map tuple = new HashMap();
+ tuple.put("id", "x001");
+ tuple.put("name", "vesense");
+ tuple.put("tpc", "tpc1");
+ tuple.put("tg", "tg1");
+
+ assertEquals("tpc1", selector.getTopic(tuple));
+ assertEquals("tg1", selector.getTag(tuple));
+
+ tuple = new HashMap();
+ tuple.put("id", "x001");
+ tuple.put("name", "vesense");
+
+ assertEquals("dtpc", selector.getTopic(tuple));
+ assertEquals("dtg", selector.getTag(tuple));
+ }
+}
diff --git a/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchemaTest.java b/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchemaTest.java
new file mode 100644
index 0000000..78baf20
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchemaTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy.common.serialization;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class SimpleKeyValueSerializationSchemaTest {
+ @Test
+ public void serializeKeyAndValue() throws Exception {
+ SimpleKeyValueSerializationSchema serializationSchema =
+ new SimpleKeyValueSerializationSchema("id", "name");
+ SimpleKeyValueDeserializationSchema deserializationSchema =
+ new SimpleKeyValueDeserializationSchema("id", "name");
+
+ Map tuple = new HashMap();
+ tuple.put("id", "x001");
+ tuple.put("name", "vesense");
+
+ assertEquals(
+ tuple,
+ deserializationSchema.deserializeKeyAndValue(
+ serializationSchema.serializeKey(tuple),
+ serializationSchema.serializeValue(tuple)));
+ }
+}
diff --git a/src/test/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumStateSerializerTest.java b/src/test/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumStateSerializerTest.java
new file mode 100644
index 0000000..45ff0e3
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumStateSerializerTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.enumerator;
+
+import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplit;
+
+import org.apache.flink.api.connector.source.SplitsAssignment;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test for {@link RocketMQSourceEnumStateSerializer}. */
+public class RocketMQSourceEnumStateSerializerTest {
+
+ @Test
+ public void testSerializeDeserializeSourceEnumState() throws IOException {
+ RocketMQSourceEnumStateSerializer serializer = new RocketMQSourceEnumStateSerializer();
+ RocketMQSourceEnumState expected = prepareSourceEnumeratorState();
+ RocketMQSourceEnumState actual = serializer.deserialize(0, serializer.serialize(expected));
+ assertEquals(expected.getCurrentAssignment(), actual.getCurrentAssignment());
+ }
+
+ private RocketMQSourceEnumState prepareSourceEnumeratorState() {
+ SplitsAssignment<RocketMQPartitionSplit> pendingAssignment =
+ new SplitsAssignment<>(new HashMap<>());
+ pendingAssignment
+ .assignment()
+ .put(
+ 0,
+ Arrays.asList(
+ new RocketMQPartitionSplit(
+ "0", "taobaodaily-01", 1, 0, System.currentTimeMillis()),
+ new RocketMQPartitionSplit(
+ "3", "taobaodaily-01", 2, 0, System.currentTimeMillis()),
+ new RocketMQPartitionSplit(
+ "6", "taobaodaily-01", 3, 0, System.currentTimeMillis()),
+ new RocketMQPartitionSplit(
+ "9", "taobaodaily-01", 4, 0, System.currentTimeMillis())));
+ pendingAssignment
+ .assignment()
+ .put(
+ 1,
+ Arrays.asList(
+ new RocketMQPartitionSplit(
+ "1", "taobaodaily-02", 5, 0, System.currentTimeMillis()),
+ new RocketMQPartitionSplit(
+ "4", "taobaodaily-02", 6, 0, System.currentTimeMillis()),
+ new RocketMQPartitionSplit(
+ "7", "taobaodaily-02", 7, 0, System.currentTimeMillis())));
+ pendingAssignment
+ .assignment()
+ .put(
+ 2,
+ Arrays.asList(
+ new RocketMQPartitionSplit(
+ "2", "taobaodaily-03", 8, 0, System.currentTimeMillis()),
+ new RocketMQPartitionSplit(
+ "5", "taobaodaily-03", 9, 0, System.currentTimeMillis()),
+ new RocketMQPartitionSplit(
+ "8", "taobaodaily-03", 10, 0, System.currentTimeMillis())));
+ return new RocketMQSourceEnumState(pendingAssignment.assignment());
+ }
+}
diff --git a/src/test/java/org/apache/rocketmq/flink/source/reader/RocketMQRecordEmitterTest.java b/src/test/java/org/apache/rocketmq/flink/source/reader/RocketMQRecordEmitterTest.java
new file mode 100644
index 0000000..83c1c4b
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/flink/source/reader/RocketMQRecordEmitterTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.reader;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplit;
+import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplitState;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test for {@link RocketMQRecordEmitter}. */
+public class RocketMQRecordEmitterTest {
+
+ @Test
+ public void testEmitRecord() {
+ RocketMQRecordEmitter<RowData> recordEmitter = new RocketMQRecordEmitter<>();
+ MessageExt message =
+ new MessageExt(
+ 1,
+ System.currentTimeMillis(),
+ InetSocketAddress.createUnresolved("localhost", 8080),
+ System.currentTimeMillis(),
+ InetSocketAddress.createUnresolved("localhost", 8088),
+ "184019387");
+ message.setBody("test_emit_record_message".getBytes());
+ GenericRowData rowData = new GenericRowData(1);
+ rowData.setField(0, message.getBody());
+ String topic = "test-record-emitter";
+ String broker = "taobaodaily";
+ int partition = 256;
+ long startingOffset = 100;
+ long stoppingTimestamp = System.currentTimeMillis();
+ Tuple3<RowData, Long, Long> record =
+ new Tuple3<>(rowData, 100L, System.currentTimeMillis());
+ RocketMQPartitionSplitState partitionSplitState =
+ new RocketMQPartitionSplitState(
+ new RocketMQPartitionSplit(
+ topic, broker, partition, startingOffset, stoppingTimestamp));
+ recordEmitter.emitRecord(record, new TestingEmitterOutput<>(), partitionSplitState);
+ assertEquals(
+ new RocketMQPartitionSplit(
+ topic, broker, partition, startingOffset + 1, stoppingTimestamp),
+ partitionSplitState.toRocketMQPartitionSplit());
+ }
+
+ private static final class TestingEmitterOutput<E> implements ReaderOutput<E> {
+
+ private TestingEmitterOutput() {}
+
+ public void collect(E record) {}
+
+ public void collect(E record, long timestamp) {
+ this.collect(record);
+ }
+
+ public void emitWatermark(Watermark watermark) {
+ throw new UnsupportedOperationException();
+ }
+
+ public void markIdle() {
+ throw new UnsupportedOperationException();
+ }
+
+ public SourceOutput<E> createOutputForSplit(String splitId) {
+ return this;
+ }
+
+ public void releaseOutputForSplit(String splitId) {}
+ }
+}
diff --git a/src/test/java/org/apache/rocketmq/flink/source/split/RocketMQPartitionSplitSerializerTest.java b/src/test/java/org/apache/rocketmq/flink/source/split/RocketMQPartitionSplitSerializerTest.java
new file mode 100644
index 0000000..b56cc9d
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/flink/source/split/RocketMQPartitionSplitSerializerTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.split;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test for {@link RocketMQPartitionSplitSerializer}. */
+public class RocketMQPartitionSplitSerializerTest {
+
+ @Test
+ public void testSerializePartitionSplit() throws IOException {
+ RocketMQPartitionSplitSerializer serializer = new RocketMQPartitionSplitSerializer();
+ RocketMQPartitionSplit expected =
+ new RocketMQPartitionSplit(
+ "test-split-serialization",
+ "taobaodaily",
+ 256,
+ 100,
+ System.currentTimeMillis());
+ RocketMQPartitionSplit actual =
+ serializer.deserialize(serializer.getVersion(), serializer.serialize(expected));
+ assertEquals(expected, actual);
+ }
+}