You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/08/24 11:18:39 UTC

[rocketmq-flink] 26/33: [#705] Support the implementation of new Source interface (#706)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git

commit f882acfc1b6336ed24564bd992e15b87e01e66fd
Author: SteNicholas <pr...@163.com>
AuthorDate: Mon Apr 19 14:01:18 2021 +0800

    [#705] Support the implementation of new Source interface (#706)
---
 README.md                                          |   8 +-
 pom.xml                                            |  57 ++-
 .../org/apache/rocketmq/flink/RocketMQSource.java  | 433 -----------------
 .../org/apache/rocketmq/flink/RunningChecker.java  |  33 --
 .../flink/common/selector/TopicSelector.java       |  28 --
 .../KeyValueDeserializationSchema.java             |  27 --
 .../serialization/KeyValueSerializationSchema.java |  28 --
 .../rocketmq/flink/common/util/MetricUtils.java    |  80 ----
 .../rocketmq/flink/common/util/RocketMQUtils.java  |  73 ---
 .../rocketmq/flink/common/util/TestUtils.java      |  33 --
 .../flink/{ => legacy}/RocketMQConfig.java         |  70 +--
 .../rocketmq/flink/{ => legacy}/RocketMQSink.java  |  98 ++--
 .../rocketmq/flink/legacy/RocketMQSource.java      | 528 +++++++++++++++++++++
 .../rocketmq/flink/legacy/RunningChecker.java      |  29 ++
 .../common/selector/DefaultTopicSelector.java      |  39 ++
 .../common/selector/SimpleTopicSelector.java       |  46 +-
 .../legacy/common/selector/TopicSelector.java      |  24 +
 .../ForwardMessageExtDeserialization.java          |  14 +-
 .../KeyValueDeserializationSchema.java             |  23 +
 .../serialization/KeyValueSerializationSchema.java |  24 +
 .../MessageExtDeserializationScheme.java           |   8 +-
 .../SimpleKeyValueDeserializationSchema.java       |  33 +-
 .../SimpleKeyValueSerializationSchema.java         |  28 +-
 .../SimpleTupleDeserializationSchema.java          |   7 +-
 .../flink/legacy/common/util/MetricUtils.java      |  85 ++++
 .../flink/{ => legacy}/common/util/RetryUtil.java  |  27 +-
 .../flink/legacy/common/util/RocketMQUtils.java    |  79 +++
 .../flink/legacy/common/util/TestUtils.java        |  29 ++
 .../watermark/BoundedOutOfOrdernessGenerator.java  |  18 +-
 .../BoundedOutOfOrdernessGeneratorPerQueue.java    |  27 +-
 .../common/watermark/PunctuatedAssigner.java       |  23 +-
 .../watermark/TimeLagWatermarkGenerator.java       |  19 +-
 .../common/watermark/WaterMarkForAll.java          |  10 +-
 .../common/watermark/WaterMarkPerQueue.java        |  18 +-
 .../{ => legacy}/example/RocketMQFlinkExample.java |  46 +-
 .../flink/{ => legacy}/example/SimpleConsumer.java |  31 +-
 .../flink/{ => legacy}/example/SimpleProducer.java |  14 +-
 .../{ => legacy}/function/SinkMapFunction.java     |  14 +-
 .../{ => legacy}/function/SourceMapFunction.java   |   9 +-
 .../rocketmq/flink/source/RocketMQSource.java      | 175 +++++++
 .../enumerator/RocketMQSourceEnumState.java}       |  33 +-
 .../RocketMQSourceEnumStateSerializer.java         |  64 +++
 .../enumerator/RocketMQSourceEnumerator.java       | 337 +++++++++++++
 .../reader/RocketMQPartitionSplitReader.java       | 373 +++++++++++++++
 .../flink/source/reader/RocketMQRecordEmitter.java |  39 ++
 .../flink/source/reader/RocketMQSourceReader.java  |  64 +++
 .../RocketMQRecordDeserializationSchema.java       |  43 ++
 .../flink/source/split/RocketMQPartitionSplit.java | 100 ++++
 .../split/RocketMQPartitionSplitSerializer.java    |  66 +++
 .../source/split/RocketMQPartitionSplitState.java  |  57 +++
 .../apache/rocketmq/flink/RocketMQSinkTest.java    |  70 ---
 .../common/selector/DefaultTopicSelectorTest.java  |  37 --
 .../common/selector/SimpleTopicSelectorTest.java   |  49 --
 .../SimpleKeyValueSerializationSchemaTest.java     |  42 --
 .../rocketmq/flink/legacy/RocketMQSinkTest.java    |  70 +++
 .../flink/{ => legacy}/RocketMQSourceTest.java     |  52 +-
 .../common/selector/DefaultTopicSelectorTest.java  |  32 ++
 .../common/selector/SimpleTopicSelectorTest.java   |  44 ++
 .../SimpleKeyValueSerializationSchemaTest.java     |  42 ++
 .../RocketMQSourceEnumStateSerializerTest.java     |  84 ++++
 .../source/reader/RocketMQRecordEmitterTest.java   |  97 ++++
 .../RocketMQPartitionSplitSerializerTest.java      |  44 ++
 62 files changed, 2979 insertions(+), 1255 deletions(-)

diff --git a/README.md b/README.md
index 600c57a..97cb1f9 100644
--- a/README.md
+++ b/README.md
@@ -9,7 +9,7 @@ The RocketMQSource is based on RocketMQ pull consumer mode, and provides exactly
 Otherwise, the source doesn't provide any reliability guarantees.
 
 ### KeyValueDeserializationSchema
-The main API for deserializing topic and tags is the `org.apache.rocketmq.flink.common.serialization.KeyValueDeserializationSchema` interface.
+The main API for deserializing topic and tags is the `org.apache.rocketmq.flink.legacy.common.serialization.KeyValueDeserializationSchema` interface.
 `rocketmq-flink` includes general purpose `KeyValueDeserializationSchema` implementations called `SimpleKeyValueDeserializationSchema`.
 
 ```java
@@ -26,7 +26,7 @@ Otherwise, the sink reliability guarantees depends on rocketmq producer's retry
 but you can change it by invoking `withAsync(true)`. 
 
 ### KeyValueSerializationSchema
-The main API for serializing topic and tags is the `org.apache.rocketmq.flink.common.serialization.KeyValueSerializationSchema` interface.
+The main API for serializing topic and tags is the `org.apache.rocketmq.flink.legacy.common.serialization.KeyValueSerializationSchema` interface.
 `rocketmq-flink` includes general purpose `KeyValueSerializationSchema` implementations called `SimpleKeyValueSerializationSchema`.
 
 ```java
@@ -39,7 +39,7 @@ public interface KeyValueSerializationSchema<T> extends Serializable {
 ```
 
 ### TopicSelector
-The main API for selecting topic and tags is the `org.apache.rocketmq.flink.common.selector.TopicSelector` interface.
+The main API for selecting topic and tags is the `org.apache.rocketmq.flink.legacy.common.selector.TopicSelector` interface.
 `rocketmq-flink` includes general purpose `TopicSelector` implementations called `DefaultTopicSelector` and `SimpleTopicSelector`.
 
 ```java
@@ -96,7 +96,7 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm
  ```
 
 ## Configurations
-The following configurations are all from the class `org.apache.rocketmq.flink.RocketMQConfig`.
+The following configurations are all from the class `org.apache.rocketmq.flink.legacy.RocketMQConfig`.
 
 ### Producer Configurations
 | NAME        | DESCRIPTION           | DEFAULT  |
diff --git a/pom.xml b/pom.xml
index 2e19ce5..d5fc49f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -34,9 +34,10 @@
         <maven.compiler.source>1.8</maven.compiler.source>
         <maven.compiler.target>1.8</maven.compiler.target>
         <rocketmq.version>4.7.1</rocketmq.version>
-        <flink.version>1.10.1</flink.version>
+        <flink.version>1.12.2</flink.version>
         <commons-lang.version>2.5</commons-lang.version>
         <scala.binary.version>2.11</scala.binary.version>
+        <spotless.version>2.4.2</spotless.version>
     </properties>
 
     <dependencies>
@@ -57,6 +58,26 @@
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-base</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-runtime-blink_2.11</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
             <artifactId>flink-queryable-state-runtime_${scala.binary.version}</artifactId>
             <version>${flink.version}</version>
         </dependency>
@@ -149,7 +170,7 @@
                                     <resource>reference.conf</resource>
                                 </transformer>
                                 <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-                                    <mainClass>org.apache.rocketmq.flink.example.RocketMQFlinkExample</mainClass>
+                                    <mainClass>org.apache.rocketmq.flink.legacy.example.RocketMQFlinkExample</mainClass>
                                 </transformer>
                             </transformers>
                         </configuration>
@@ -218,6 +239,38 @@
                     <locale>en</locale>
                 </configuration>
             </plugin>
+            <!-- Due to the Flink build setup, "mvn spotless:apply" and "mvn spotless:check"
+				don't work. You have to use the fully qualified name, i.e.
+				"mvn com.diffplug.spotless:spotless-maven-plugin:apply" -->
+            <plugin>
+                <groupId>com.diffplug.spotless</groupId>
+                <artifactId>spotless-maven-plugin</artifactId>
+                <version>${spotless.version}</version>
+                <configuration>
+                    <java>
+                        <googleJavaFormat>
+                            <version>1.7</version>
+                            <style>AOSP</style>
+                        </googleJavaFormat>
+
+                        <!-- \# refers to the static imports -->
+                        <importOrder>
+                            <order>org.apache.rocketmq,org.apache.flink,org.apache.flink.shaded,,javax,java,scala,\#</order>
+                        </importOrder>
+
+                        <removeUnusedImports />
+                    </java>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>spotless-check</id>
+                        <phase>validate</phase>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
     </build>
 </project>
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
deleted file mode 100644
index 72783a8..0000000
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
+++ /dev/null
@@ -1,433 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.rocketmq.flink;
-
-import org.apache.commons.collections.map.LinkedMap;
-import org.apache.commons.lang.Validate;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.metrics.Meter;
-import org.apache.flink.metrics.MeterView;
-import org.apache.flink.metrics.SimpleCounter;
-import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
-import org.apache.rocketmq.client.consumer.PullResult;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.flink.common.serialization.KeyValueDeserializationSchema;
-import org.apache.rocketmq.flink.common.util.MetricUtils;
-import org.apache.rocketmq.flink.common.util.RetryUtil;
-import org.apache.rocketmq.flink.common.util.RocketMQUtils;
-import org.apache.rocketmq.flink.common.watermark.WaterMarkForAll;
-import org.apache.rocketmq.flink.common.watermark.WaterMarkPerQueue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.management.ManagementFactory;
-import java.nio.charset.StandardCharsets;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.locks.ReentrantLock;
-
-import static org.apache.rocketmq.flink.RocketMQConfig.*;
-import static org.apache.rocketmq.flink.common.util.RocketMQUtils.getInteger;
-import static org.apache.rocketmq.flink.common.util.RocketMQUtils.getLong;
-
-/**
- * The RocketMQSource is based on RocketMQ pull consumer mode, and provides exactly once reliability guarantees when
- * checkpoints are enabled. Otherwise, the source doesn't provide any reliability guarantees.
- */
-public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
-        implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<OUT> {
-
-    private static final long serialVersionUID = 1L;
-
-    private static final Logger log = LoggerFactory.getLogger(RocketMQSource.class);
-    private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";
-    private RunningChecker runningChecker;
-    private transient DefaultMQPullConsumer consumer;
-    private KeyValueDeserializationSchema<OUT> schema;
-    private transient ListState<Tuple2<MessageQueue, Long>> unionOffsetStates;
-    private Map<MessageQueue, Long> offsetTable;
-    private Map<MessageQueue, Long> restoredOffsets;
-    private List<MessageQueue> messageQueues;
-    private ExecutorService executor;
-
-    // watermark in source
-    private WaterMarkPerQueue waterMarkPerQueue;
-    private WaterMarkForAll waterMarkForAll;
-
-    private ScheduledExecutorService timer;
-    /**
-     * Data for pending but uncommitted offsets.
-     */
-    private LinkedMap pendingOffsetsToCommit;
-    private Properties props;
-    private String topic;
-    private String group;
-    private transient volatile boolean restored;
-    private transient boolean enableCheckpoint;
-    private volatile Object checkPointLock;
-
-    private Meter tpsMetric;
-
-    public RocketMQSource(KeyValueDeserializationSchema<OUT> schema, Properties props) {
-        this.schema = schema;
-        this.props = props;
-    }
-
-    @Override
-    public void open(Configuration parameters) throws Exception {
-        log.debug("source open....");
-        Validate.notEmpty(props, "Consumer properties can not be empty");
-
-        this.topic = props.getProperty(RocketMQConfig.CONSUMER_TOPIC);
-        this.group = props.getProperty(RocketMQConfig.CONSUMER_GROUP);
-
-        Validate.notEmpty(topic, "Consumer topic can not be empty");
-        Validate.notEmpty(group, "Consumer group can not be empty");
-
-        this.enableCheckpoint = ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled();
-
-        if (offsetTable == null) {
-            offsetTable = new ConcurrentHashMap<>();
-        }
-        if (restoredOffsets == null) {
-            restoredOffsets = new ConcurrentHashMap<>();
-        }
-
-        //use restoredOffsets to init offset table.
-        initOffsetTableFromRestoredOffsets();
-
-        if (pendingOffsetsToCommit == null) {
-            pendingOffsetsToCommit = new LinkedMap();
-        }
-        if (checkPointLock == null) {
-            checkPointLock = new ReentrantLock();
-        }
-        if (waterMarkPerQueue == null) {
-            waterMarkPerQueue = new WaterMarkPerQueue(5000);
-        }
-        if (waterMarkForAll == null) {
-            waterMarkForAll = new WaterMarkForAll(5000);
-        }
-        if (timer == null) {
-            timer = Executors.newSingleThreadScheduledExecutor();
-        }
-
-        runningChecker = new RunningChecker();
-        runningChecker.setRunning(true);
-
-        final ThreadFactory threadFactory = new ThreadFactoryBuilder()
-                .setDaemon(true).setNameFormat("rmq-pull-thread-%d").build();
-        executor = Executors.newCachedThreadPool(threadFactory);
-
-        int indexOfThisSubTask = getRuntimeContext().getIndexOfThisSubtask();
-        consumer = new DefaultMQPullConsumer(group, RocketMQConfig.buildAclRPCHook(props));
-        RocketMQConfig.buildConsumerConfigs(props, consumer);
-
-        // set unique instance name, avoid exception: https://help.aliyun.com/document_detail/29646.html
-        String runtimeName = ManagementFactory.getRuntimeMXBean().getName();
-        String instanceName = RocketMQUtils.getInstanceName(runtimeName, topic, group,
-                String.valueOf(indexOfThisSubTask), String.valueOf(System.nanoTime()));
-        consumer.setInstanceName(instanceName);
-        consumer.start();
-
-        Counter outputCounter = getRuntimeContext().getMetricGroup()
-                .counter(MetricUtils.METRICS_TPS + "_counter", new SimpleCounter());
-        tpsMetric = getRuntimeContext().getMetricGroup()
-                .meter(MetricUtils.METRICS_TPS, new MeterView(outputCounter, 60));
-    }
-
-    @Override
-    public void run(SourceContext context) throws Exception {
-        String tag = props.getProperty(RocketMQConfig.CONSUMER_TAG, RocketMQConfig.DEFAULT_CONSUMER_TAG);
-        int pullBatchSize = getInteger(props, CONSUMER_BATCH_SIZE, DEFAULT_CONSUMER_BATCH_SIZE);
-
-        final RuntimeContext ctx = getRuntimeContext();
-        // The lock that guarantees that record emission and state updates are atomic,
-        // from the view of taking a checkpoint.
-        int taskNumber = ctx.getNumberOfParallelSubtasks();
-        int taskIndex = ctx.getIndexOfThisSubtask();
-        log.info("Source run, NumberOfTotalTask={}, IndexOfThisSubTask={}", taskNumber, taskIndex);
-
-
-        timer.scheduleAtFixedRate(() -> {
-            // context.emitWatermark(waterMarkPerQueue.getCurrentWatermark());
-            context.emitWatermark(waterMarkForAll.getCurrentWatermark());
-        }, 5, 5, TimeUnit.SECONDS);
-
-        Collection<MessageQueue> totalQueues = consumer.fetchSubscribeMessageQueues(topic);
-        messageQueues = RocketMQUtils.allocate(totalQueues, taskNumber, ctx.getIndexOfThisSubtask());
-        for (MessageQueue mq : messageQueues) {
-            this.executor.execute(() -> {
-                RetryUtil.call(() -> {
-                    while (runningChecker.isRunning()) {
-                        try {
-                            long offset = getMessageQueueOffset(mq);
-                            PullResult pullResult = consumer.pullBlockIfNotFound(mq, tag, offset, pullBatchSize);
-
-                            boolean found = false;
-                            switch (pullResult.getPullStatus()) {
-                                case FOUND:
-                                    List<MessageExt> messages = pullResult.getMsgFoundList();
-                                    for (MessageExt msg : messages) {
-                                        byte[] key = msg.getKeys() != null ? msg.getKeys().getBytes(StandardCharsets.UTF_8) : null;
-                                        byte[] value = msg.getBody();
-                                        OUT data = schema.deserializeKeyAndValue(key, value);
-
-                                        // output and state update are atomic
-                                        synchronized (checkPointLock) {
-                                            log.debug(msg.getMsgId() + "_" + msg.getBrokerName() + " " + msg.getQueueId() + " " + msg.getQueueOffset());
-                                            context.collectWithTimestamp(data, msg.getBornTimestamp());
-
-                                            // update max eventTime per queue
-                                            // waterMarkPerQueue.extractTimestamp(mq, msg.getBornTimestamp());
-                                            waterMarkForAll.extractTimestamp(msg.getBornTimestamp());
-                                            tpsMetric.markEvent();
-                                        }
-                                    }
-                                    found = true;
-                                    break;
-                                case NO_MATCHED_MSG:
-                                    log.debug("No matched message after offset {} for queue {}", offset, mq);
-                                    break;
-                                case NO_NEW_MSG:
-                                    log.debug("No new message after offset {} for queue {}", offset, mq);
-                                    break;
-                                case OFFSET_ILLEGAL:
-                                    log.warn("Offset {} is illegal for queue {}", offset, mq);
-                                    break;
-                                default:
-                                    break;
-                            }
-
-                            synchronized (checkPointLock) {
-                                updateMessageQueueOffset(mq, pullResult.getNextBeginOffset());
-                            }
-
-                            if (!found) {
-                                RetryUtil.waitForMs(RocketMQConfig.DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND);
-                            }
-                        } catch (Exception e) {
-                            throw new RuntimeException(e);
-                        }
-                    }
-                    return true;
-                }, "RuntimeException");
-            });
-        }
-
-        awaitTermination();
-    }
-
-    private void awaitTermination() throws InterruptedException {
-        while (runningChecker.isRunning()) {
-            Thread.sleep(50);
-        }
-    }
-
-    private long getMessageQueueOffset(MessageQueue mq) throws MQClientException {
-        Long offset = offsetTable.get(mq);
-        // restoredOffsets(unionOffsetStates) is the restored global union state;
-        // should only snapshot mqs that actually belong to us
-        if (offset == null) {
-            // fetchConsumeOffset from broker
-            offset = consumer.fetchConsumeOffset(mq, false);
-            if (!restored || offset < 0) {
-                String initialOffset = props.getProperty(RocketMQConfig.CONSUMER_OFFSET_RESET_TO, CONSUMER_OFFSET_LATEST);
-                switch (initialOffset) {
-                    case CONSUMER_OFFSET_EARLIEST:
-                        offset = consumer.minOffset(mq);
-                        break;
-                    case CONSUMER_OFFSET_LATEST:
-                        offset = consumer.maxOffset(mq);
-                        break;
-                    case CONSUMER_OFFSET_TIMESTAMP:
-                        offset = consumer.searchOffset(mq, getLong(props,
-                                RocketMQConfig.CONSUMER_OFFSET_FROM_TIMESTAMP, System.currentTimeMillis()));
-                        break;
-                    default:
-                        throw new IllegalArgumentException("Unknown value for CONSUMER_OFFSET_RESET_TO.");
-                }
-            }
-        }
-        offsetTable.put(mq, offset);
-        return offsetTable.get(mq);
-    }
-
-    private void updateMessageQueueOffset(MessageQueue mq, long offset) throws MQClientException {
-        offsetTable.put(mq, offset);
-        if (!enableCheckpoint) {
-            consumer.updateConsumeOffset(mq, offset);
-        }
-    }
-
-    @Override
-    public void cancel() {
-        log.debug("cancel ...");
-        runningChecker.setRunning(false);
-
-        if (consumer != null) {
-            consumer.shutdown();
-        }
-
-        if (offsetTable != null) {
-            offsetTable.clear();
-        }
-        if (restoredOffsets != null) {
-            restoredOffsets.clear();
-        }
-        if (pendingOffsetsToCommit != null) {
-            pendingOffsetsToCommit.clear();
-        }
-    }
-
-    @Override
-    public void close() throws Exception {
-        log.debug("close ...");
-        // pretty much the same logic as cancelling
-        try {
-            cancel();
-        } finally {
-            super.close();
-        }
-    }
-
-    public void initOffsetTableFromRestoredOffsets() {
-        Preconditions.checkNotNull(restoredOffsets, "restoredOffsets can't be null");
-        restoredOffsets.forEach((mq, offset) -> {
-            if (!offsetTable.containsKey(mq) || offsetTable.get(mq) < offset) {
-                offsetTable.put(mq, offset);
-            }
-        });
-        log.info("init offset table from restoredOffsets successful.", offsetTable);
-    }
-
-    @Override
-    public void snapshotState(FunctionSnapshotContext context) throws Exception {
-        // called when a snapshot for a checkpoint is requested
-        log.info("Snapshotting state {} ...", context.getCheckpointId());
-        if (!runningChecker.isRunning()) {
-            log.info("snapshotState() called on closed source; returning null.");
-            return;
-        }
-
-        // Discovery topic Route change when snapshot
-        RetryUtil.call(() -> {
-            Collection<MessageQueue> totalQueues = consumer.fetchSubscribeMessageQueues(topic);
-            int taskNumber = getRuntimeContext().getNumberOfParallelSubtasks();
-            int taskIndex = getRuntimeContext().getIndexOfThisSubtask();
-            List<MessageQueue> newQueues = RocketMQUtils.allocate(totalQueues, taskNumber, taskIndex);
-            Collections.sort(newQueues);
-            log.debug(taskIndex + " Topic route is same.");
-            if (!messageQueues.equals(newQueues)) {
-                throw new RuntimeException();
-            }
-            return true;
-        }, "RuntimeException due to topic route changed");
-
-        unionOffsetStates.clear();
-        HashMap<MessageQueue, Long> currentOffsets = new HashMap<>(offsetTable.size());
-        for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
-            unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue()));
-            currentOffsets.put(entry.getKey(), entry.getValue());
-        }
-        pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
-        log.info("Snapshotted state, last processed offsets: {}, checkpoint id: {}, timestamp: {}",
-                offsetTable, context.getCheckpointId(), context.getCheckpointTimestamp());
-    }
-
-    /**
-     * called every time the user-defined function is initialized,
-     * be that when the function is first initialized or be that
-     * when the function is actually recovering from an earlier checkpoint.
-     * Given this, initializeState() is not only the place where different types of state are initialized,
-     * but also where state recovery logic is included.
-     */
-    @Override
-    public void initializeState(FunctionInitializationContext context) throws Exception {
-        log.info("initialize State ...");
-
-        this.unionOffsetStates = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<>(
-                OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint<Tuple2<MessageQueue, Long>>() {
-        })));
-        this.restored = context.isRestored();
-
-        if (restored) {
-            if (restoredOffsets == null) {
-                restoredOffsets = new ConcurrentHashMap<>();
-            }
-            for (Tuple2<MessageQueue, Long> mqOffsets : unionOffsetStates.get()) {
-                if (!restoredOffsets.containsKey(mqOffsets.f0) || restoredOffsets.get(mqOffsets.f0) < mqOffsets.f1) {
-                    restoredOffsets.put(mqOffsets.f0, mqOffsets.f1);
-                }
-            }
-            log.info("Setting restore state in the consumer. Using the following offsets: {}", restoredOffsets);
-        } else {
-            log.info("No restore state for the consumer.");
-        }
-    }
-
-    @Override
-    public TypeInformation getProducedType() {
-        return schema.getProducedType();
-    }
-
-    @Override
-    public void notifyCheckpointComplete(long checkpointId) throws Exception {
-        // callback when checkpoint complete
-        if (!runningChecker.isRunning()) {
-            log.info("notifyCheckpointComplete() called on closed source; returning null.");
-            return;
-        }
-
-        final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
-        if (posInMap == -1) {
-            log.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
-            return;
-        }
-
-        Map<MessageQueue, Long> offsets = (Map<MessageQueue, Long>) pendingOffsetsToCommit.remove(posInMap);
-
-        // remove older checkpoints in map
-        for (int i = 0; i < posInMap; i++) {
-            pendingOffsetsToCommit.remove(0);
-        }
-
-        if (offsets == null || offsets.size() == 0) {
-            log.debug("Checkpoint state was empty.");
-            return;
-        }
-
-        for (Map.Entry<MessageQueue, Long> entry : offsets.entrySet()) {
-            consumer.updateConsumeOffset(entry.getKey(), entry.getValue());
-        }
-    }
-}
diff --git a/src/main/java/org/apache/rocketmq/flink/RunningChecker.java b/src/main/java/org/apache/rocketmq/flink/RunningChecker.java
deleted file mode 100644
index b7bc2b9..0000000
--- a/src/main/java/org/apache/rocketmq/flink/RunningChecker.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.flink;
-
-import java.io.Serializable;
-
-public class RunningChecker implements Serializable {
-    private volatile boolean isRunning = false;
-
-    public boolean isRunning() {
-        return isRunning;
-    }
-
-    public void setRunning(boolean running) {
-        isRunning = running;
-    }
-}
diff --git a/src/main/java/org/apache/rocketmq/flink/common/selector/TopicSelector.java b/src/main/java/org/apache/rocketmq/flink/common/selector/TopicSelector.java
deleted file mode 100644
index 2a347db..0000000
--- a/src/main/java/org/apache/rocketmq/flink/common/selector/TopicSelector.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.flink.common.selector;
-
-import java.io.Serializable;
-
-public interface TopicSelector<T> extends Serializable {
-
-    String getTopic(T tuple);
-
-    String getTag(T tuple);
-}
diff --git a/src/main/java/org/apache/rocketmq/flink/common/serialization/KeyValueDeserializationSchema.java b/src/main/java/org/apache/rocketmq/flink/common/serialization/KeyValueDeserializationSchema.java
deleted file mode 100644
index d8759f9..0000000
--- a/src/main/java/org/apache/rocketmq/flink/common/serialization/KeyValueDeserializationSchema.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.flink.common.serialization;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-
-public interface KeyValueDeserializationSchema<T> extends ResultTypeQueryable<T>, Serializable {
-    T deserializeKeyAndValue(byte[] key, byte[] value);
-}
diff --git a/src/main/java/org/apache/rocketmq/flink/common/serialization/KeyValueSerializationSchema.java b/src/main/java/org/apache/rocketmq/flink/common/serialization/KeyValueSerializationSchema.java
deleted file mode 100644
index d847e8a..0000000
--- a/src/main/java/org/apache/rocketmq/flink/common/serialization/KeyValueSerializationSchema.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.flink.common.serialization;
-
-import java.io.Serializable;
-
-public interface KeyValueSerializationSchema<T> extends Serializable {
-
-    byte[] serializeKey(T tuple);
-
-    byte[] serializeValue(T tuple);
-}
diff --git a/src/main/java/org/apache/rocketmq/flink/common/util/MetricUtils.java b/src/main/java/org/apache/rocketmq/flink/common/util/MetricUtils.java
deleted file mode 100644
index 764d01f..0000000
--- a/src/main/java/org/apache/rocketmq/flink/common/util/MetricUtils.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.flink.common.util;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Meter;
-import org.apache.flink.metrics.MeterView;
-import org.apache.flink.metrics.SimpleCounter;
-
-/**
- * RocketMQ connector metrics.
- */
-public class MetricUtils {
-
-    public static final String METRICS_TPS = "tps";
-
-    private static final String METRIC_GROUP_SINK = "sink";
-    private static final String METRICS_SINK_IN_TPS = "inTps";
-    private static final String METRICS_SINK_OUT_TPS = "outTps";
-    private static final String METRICS_SINK_OUT_BPS = "outBps";
-    private static final String METRICS_SINK_OUT_Latency = "outLatency";
-
-    public static Meter registerSinkInTps(RuntimeContext context) {
-        Counter parserCounter = context.getMetricGroup().addGroup(METRIC_GROUP_SINK)
-            .counter(METRICS_SINK_IN_TPS + "_counter", new SimpleCounter());
-        return context.getMetricGroup().addGroup(METRIC_GROUP_SINK)
-            .meter(METRICS_SINK_IN_TPS, new MeterView(parserCounter, 60));
-    }
-
-    public static Meter registerOutTps(RuntimeContext context) {
-        Counter parserCounter = context.getMetricGroup().addGroup(METRIC_GROUP_SINK)
-            .counter(METRICS_SINK_OUT_TPS + "_counter", new SimpleCounter());
-        return context.getMetricGroup().addGroup(METRIC_GROUP_SINK)
-                .meter(METRICS_SINK_OUT_TPS, new MeterView(parserCounter, 60));
-    }
-
-    public static Meter registerOutBps(RuntimeContext context) {
-        Counter bpsCounter = context.getMetricGroup().addGroup(METRIC_GROUP_SINK)
-                .counter(METRICS_SINK_OUT_BPS + "_counter", new SimpleCounter());
-        return context.getMetricGroup().addGroup(METRIC_GROUP_SINK)
-                .meter(METRICS_SINK_OUT_BPS, new MeterView(bpsCounter, 60));
-    }
-
-    public static LatencyGauge registerOutLatency(RuntimeContext context) {
-        return context.getMetricGroup().addGroup(METRIC_GROUP_SINK).gauge(METRICS_SINK_OUT_Latency, new LatencyGauge());
-    }
-
-    public static class LatencyGauge implements Gauge<Double> {
-        private double value;
-
-        public void report(long timeDelta, long batchSize) {
-            if (batchSize != 0) {
-                this.value = (1.0 * timeDelta) / batchSize;
-            }
-        }
-
-        @Override
-        public Double getValue() {
-            return value;
-        }
-    }
-}
diff --git a/src/main/java/org/apache/rocketmq/flink/common/util/RocketMQUtils.java b/src/main/java/org/apache/rocketmq/flink/common/util/RocketMQUtils.java
deleted file mode 100644
index fc37b04..0000000
--- a/src/main/java/org/apache/rocketmq/flink/common/util/RocketMQUtils.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.flink.common.util;
-
-import org.apache.rocketmq.client.AccessChannel;
-import org.apache.rocketmq.common.message.MessageQueue;
-
-import java.lang.management.ManagementFactory;
-import java.util.*;
-
-public final class RocketMQUtils {
-
-    public static int getInteger(Properties props, String key, int defaultValue) {
-        return Integer.parseInt(props.getProperty(key, String.valueOf(defaultValue)));
-    }
-
-    public static long getLong(Properties props, String key, long defaultValue) {
-        return Long.parseLong(props.getProperty(key, String.valueOf(defaultValue)));
-    }
-
-    public static boolean getBoolean(Properties props, String key, boolean defaultValue) {
-        return Boolean.parseBoolean(props.getProperty(key, String.valueOf(defaultValue)));
-    }
-
-    public static AccessChannel getAccessChannel(Properties props, String key, AccessChannel defaultValue) {
-        return AccessChannel.valueOf(props.getProperty(key, String.valueOf(defaultValue)));
-    }
-
-    public static String getInstanceName(String... args) {
-        if (null != args && args.length > 0) {
-            return String.join("_", args);
-        }
-        return ManagementFactory.getRuntimeMXBean().getName() + "_" + System.nanoTime();
-    }
-
-    /**
-     * Average Hashing queue algorithm
-     * Refer: org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely
-     */
-    public static List<MessageQueue> allocate(Collection<MessageQueue> mqSet,
-                                              int numberOfParallelTasks,
-                                              int indexOfThisTask) {
-        ArrayList<MessageQueue> mqAll = new ArrayList<>(mqSet);
-        Collections.sort(mqAll);
-        List<MessageQueue> result = new ArrayList<>();
-        int mod = mqAll.size() % numberOfParallelTasks;
-        int averageSize = mqAll.size() <= numberOfParallelTasks ? 1 : (mod > 0 && indexOfThisTask < mod ?
-                mqAll.size() / numberOfParallelTasks + 1 : mqAll.size() / numberOfParallelTasks);
-        int startIndex = (mod > 0 && indexOfThisTask < mod) ? indexOfThisTask * averageSize :
-                indexOfThisTask * averageSize + mod;
-        int range = Math.min(averageSize, mqAll.size() - startIndex);
-        for (int i = 0; i < range; i++) {
-            result.add(mqAll.get((startIndex + i) % mqAll.size()));
-        }
-        return result;
-    }
-}
diff --git a/src/main/java/org/apache/rocketmq/flink/common/util/TestUtils.java b/src/main/java/org/apache/rocketmq/flink/common/util/TestUtils.java
deleted file mode 100644
index 71d1265..0000000
--- a/src/main/java/org/apache/rocketmq/flink/common/util/TestUtils.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.flink.common.util;
-
-import java.lang.reflect.Field;
-
-public class TestUtils {
-    public static void setFieldValue(Object obj, String fieldName, Object value) {
-        try {
-            Field field = obj.getClass().getDeclaredField(fieldName);
-            field.setAccessible(true);
-            field.set(obj, value);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-}
diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQConfig.java
similarity index 77%
rename from src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/RocketMQConfig.java
index c1bad2d..5c19b7a 100644
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQConfig.java
@@ -1,25 +1,19 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.rocketmq.flink.legacy;
 
-package org.apache.rocketmq.flink;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.Validate;
 import org.apache.rocketmq.acl.common.AclClientRPCHook;
 import org.apache.rocketmq.acl.common.SessionCredentials;
 import org.apache.rocketmq.client.AccessChannel;
@@ -28,15 +22,16 @@ import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.Validate;
+
 import java.util.Properties;
 import java.util.UUID;
 
-import static org.apache.rocketmq.flink.common.util.RocketMQUtils.getAccessChannel;
-import static org.apache.rocketmq.flink.common.util.RocketMQUtils.getInteger;
+import static org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils.getAccessChannel;
+import static org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils.getInteger;
 
-/**
- * RocketMQConfig for Consumer/Producer.
- */
+/** RocketMQConfig for Consumer/Producer. */
 public class RocketMQConfig {
     // Server Config
     public static final String NAME_SERVER_ADDR = "nameserver.address"; // Required
@@ -77,13 +72,15 @@ public class RocketMQConfig {
     public static final String CONSUMER_OFFSET_TIMESTAMP = "timestamp";
     public static final String CONSUMER_OFFSET_FROM_TIMESTAMP = "consumer.offset.from.timestamp";
 
-    public static final String CONSUMER_OFFSET_PERSIST_INTERVAL = "consumer.offset.persist.interval";
+    public static final String CONSUMER_OFFSET_PERSIST_INTERVAL =
+            "consumer.offset.persist.interval";
     public static final int DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL = 5000; // 5 seconds
 
     public static final String CONSUMER_BATCH_SIZE = "consumer.batch.size";
     public static final int DEFAULT_CONSUMER_BATCH_SIZE = 32;
 
-    public static final String CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND = "consumer.delay.when.message.not.found";
+    public static final String CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND =
+            "consumer.delay.when.message.not.found";
     public static final int DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND = 100;
 
     public static final String CONSUMER_INDEX_OF_THIS_SUB_TASK = "consumer.index";
@@ -116,6 +113,7 @@ public class RocketMQConfig {
 
     /**
      * Build Producer Configs.
+     *
      * @param props Properties
      * @param producer DefaultMQProducer
      */
@@ -126,27 +124,32 @@ public class RocketMQConfig {
             group = UUID.randomUUID().toString();
         }
         producer.setProducerGroup(props.getProperty(PRODUCER_GROUP, group));
-        producer.setRetryTimesWhenSendFailed(getInteger(props, PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES));
-        producer.setRetryTimesWhenSendAsyncFailed(getInteger(props,
-                PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES));
+        producer.setRetryTimesWhenSendFailed(
+                getInteger(props, PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES));
+        producer.setRetryTimesWhenSendAsyncFailed(
+                getInteger(props, PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES));
         producer.setSendMsgTimeout(getInteger(props, PRODUCER_TIMEOUT, DEFAULT_PRODUCER_TIMEOUT));
-
     }
 
     /**
      * Build Consumer Configs.
+     *
      * @param props Properties
      * @param consumer DefaultMQPullConsumer
      */
     public static void buildConsumerConfigs(Properties props, DefaultMQPullConsumer consumer) {
         buildCommonConfigs(props, consumer);
         consumer.setMessageModel(MessageModel.CLUSTERING);
-        consumer.setPersistConsumerOffsetInterval(getInteger(props,
-                CONSUMER_OFFSET_PERSIST_INTERVAL, DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL));
+        consumer.setPersistConsumerOffsetInterval(
+                getInteger(
+                        props,
+                        CONSUMER_OFFSET_PERSIST_INTERVAL,
+                        DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL));
     }
 
     /**
      * Build Common Configs.
+     *
      * @param props Properties
      * @param client ClientConfig
      */
@@ -154,8 +157,8 @@ public class RocketMQConfig {
         String nameServers = props.getProperty(NAME_SERVER_ADDR);
         Validate.notEmpty(nameServers);
         client.setNamesrvAddr(nameServers);
-        client.setHeartbeatBrokerInterval(getInteger(props,
-                BROKER_HEART_BEAT_INTERVAL, DEFAULT_BROKER_HEART_BEAT_INTERVAL));
+        client.setHeartbeatBrokerInterval(
+                getInteger(props, BROKER_HEART_BEAT_INTERVAL, DEFAULT_BROKER_HEART_BEAT_INTERVAL));
         // When using aliyun products, you need to set up channels
         client.setAccessChannel((getAccessChannel(props, ACCESS_CHANNEL, DEFAULT_ACCESS_CHANNEL)));
         client.setUnitName(props.getProperty(UNIT_NAME, null));
@@ -163,6 +166,7 @@ public class RocketMQConfig {
 
     /**
      * Build credentials for client.
+     *
      * @param props
      * @return
      */
diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSink.java
similarity index 69%
rename from src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSink.java
index 865af75..f91a684 100644
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSink.java
@@ -1,24 +1,28 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.rocketmq.flink.legacy;
 
-package org.apache.rocketmq.flink;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.flink.legacy.common.util.MetricUtils;
+import org.apache.rocketmq.remoting.exception.RemotingException;
 
-import org.apache.commons.lang.Validate;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -26,15 +30,8 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.rocketmq.client.AccessChannel;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.SendCallback;
-import org.apache.rocketmq.client.producer.SendResult;
-import org.apache.rocketmq.client.producer.SendStatus;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.flink.common.util.MetricUtils;
-import org.apache.rocketmq.remoting.exception.RemotingException;
+
+import org.apache.commons.lang.Validate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,9 +41,9 @@ import java.util.Properties;
 import java.util.UUID;
 
 /**
- * The RocketMQSink provides at-least-once reliability guarantees when
- * checkpoints are enabled and batchFlushOnCheckpoint(true) is set.
- * Otherwise, the sink reliability guarantees depends on rocketmq producer's retry policy.
+ * The RocketMQSink provides at-least-once reliability guarantees when checkpoints are enabled and
+ * batchFlushOnCheckpoint(true) is set. Otherwise, the sink reliability guarantees depends on
+ * rocketmq producer's retry policy.
  */
 public class RocketMQSink<IN> extends RichSinkFunction<IN> implements CheckpointedFunction {
 
@@ -78,14 +75,17 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint
 
         // with authentication hook
         producer = new DefaultMQProducer(RocketMQConfig.buildAclRPCHook(props));
-        producer.setInstanceName(getRuntimeContext().getIndexOfThisSubtask() + "_" + UUID.randomUUID());
+        producer.setInstanceName(
+                getRuntimeContext().getIndexOfThisSubtask() + "_" + UUID.randomUUID());
 
         RocketMQConfig.buildProducerConfigs(props, producer);
 
         batchList = new LinkedList<>();
 
-        if (batchFlushOnCheckpoint && !((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled()) {
-            LOG.info("Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.");
+        if (batchFlushOnCheckpoint
+                && !((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled()) {
+            LOG.info(
+                    "Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.");
             batchFlushOnCheckpoint = false;
         }
 
@@ -117,23 +117,25 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint
         long timeStartWriting = System.currentTimeMillis();
         if (async) {
             try {
-                producer.send(msg, new SendCallback() {
-                    @Override
-                    public void onSuccess(SendResult sendResult) {
-                        LOG.debug("Async send message success! result: {}", sendResult);
-                        long end = System.currentTimeMillis();
-                        latencyGauge.report(end - timeStartWriting, 1);
-                        outTps.markEvent();
-                        outBps.markEvent(msg.getBody().length);
-                    }
-
-                    @Override
-                    public void onException(Throwable throwable) {
-                        if (throwable != null) {
-                            LOG.error("Async send message failure!", throwable);
-                        }
-                    }
-                });
+                producer.send(
+                        msg,
+                        new SendCallback() {
+                            @Override
+                            public void onSuccess(SendResult sendResult) {
+                                LOG.debug("Async send message success! result: {}", sendResult);
+                                long end = System.currentTimeMillis();
+                                latencyGauge.report(end - timeStartWriting, 1);
+                                outTps.markEvent();
+                                outBps.markEvent(msg.getBody().length);
+                            }
+
+                            @Override
+                            public void onException(Throwable throwable) {
+                                if (throwable != null) {
+                                    LOG.error("Async send message failure!", throwable);
+                                }
+                            }
+                        });
             } catch (Exception e) {
                 LOG.error("Async send message failure!", e);
             }
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSource.java
new file mode 100644
index 0000000..84260b6
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSource.java
@@ -0,0 +1,528 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.flink.legacy.common.serialization.KeyValueDeserializationSchema;
+import org.apache.rocketmq.flink.legacy.common.util.MetricUtils;
+import org.apache.rocketmq.flink.legacy.common.util.RetryUtil;
+import org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils;
+import org.apache.rocketmq.flink.legacy.common.watermark.WaterMarkForAll;
+import org.apache.rocketmq.flink.legacy.common.watermark.WaterMarkPerQueue;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.curator4.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.apache.commons.collections.map.LinkedMap;
+import org.apache.commons.lang.Validate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.management.ManagementFactory;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_BATCH_SIZE;
+import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_EARLIEST;
+import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_LATEST;
+import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_TIMESTAMP;
+import static org.apache.rocketmq.flink.legacy.RocketMQConfig.DEFAULT_CONSUMER_BATCH_SIZE;
+import static org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils.getInteger;
+import static org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils.getLong;
+
+/**
+ * The RocketMQSource is based on RocketMQ pull consumer mode, and provides exactly once reliability
+ * guarantees when checkpoints are enabled. Otherwise, the source doesn't provide any reliability
+ * guarantees.
+ */
+public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
+        implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<OUT> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger log = LoggerFactory.getLogger(RocketMQSource.class);
+    private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";
+    private RunningChecker runningChecker;
+    private transient DefaultMQPullConsumer consumer;
+    private KeyValueDeserializationSchema<OUT> schema;
+    private transient ListState<Tuple2<MessageQueue, Long>> unionOffsetStates;
+    private Map<MessageQueue, Long> offsetTable;
+    private Map<MessageQueue, Long> restoredOffsets;
+    private List<MessageQueue> messageQueues;
+    private ExecutorService executor;
+
+    // watermark in source
+    private WaterMarkPerQueue waterMarkPerQueue;
+    private WaterMarkForAll waterMarkForAll;
+
+    private ScheduledExecutorService timer;
+    /** Data for pending but uncommitted offsets. */
+    private LinkedMap pendingOffsetsToCommit;
+
+    private Properties props;
+    private String topic;
+    private String group;
+    private transient volatile boolean restored;
+    private transient boolean enableCheckpoint;
+    private volatile Object checkPointLock;
+
+    private Meter tpsMetric;
+
+    public RocketMQSource(KeyValueDeserializationSchema<OUT> schema, Properties props) {
+        this.schema = schema;
+        this.props = props;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        log.debug("source open....");
+        Validate.notEmpty(props, "Consumer properties can not be empty");
+
+        this.topic = props.getProperty(RocketMQConfig.CONSUMER_TOPIC);
+        this.group = props.getProperty(RocketMQConfig.CONSUMER_GROUP);
+
+        Validate.notEmpty(topic, "Consumer topic can not be empty");
+        Validate.notEmpty(group, "Consumer group can not be empty");
+
+        this.enableCheckpoint =
+                ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled();
+
+        if (offsetTable == null) {
+            offsetTable = new ConcurrentHashMap<>();
+        }
+        if (restoredOffsets == null) {
+            restoredOffsets = new ConcurrentHashMap<>();
+        }
+
+        // use restoredOffsets to init offset table.
+        initOffsetTableFromRestoredOffsets();
+
+        if (pendingOffsetsToCommit == null) {
+            pendingOffsetsToCommit = new LinkedMap();
+        }
+        if (checkPointLock == null) {
+            checkPointLock = new ReentrantLock();
+        }
+        if (waterMarkPerQueue == null) {
+            waterMarkPerQueue = new WaterMarkPerQueue(5000);
+        }
+        if (waterMarkForAll == null) {
+            waterMarkForAll = new WaterMarkForAll(5000);
+        }
+        if (timer == null) {
+            timer = Executors.newSingleThreadScheduledExecutor();
+        }
+
+        runningChecker = new RunningChecker();
+        runningChecker.setRunning(true);
+
+        final ThreadFactory threadFactory =
+                new ThreadFactoryBuilder()
+                        .setDaemon(true)
+                        .setNameFormat("rmq-pull-thread-%d")
+                        .build();
+        executor = Executors.newCachedThreadPool(threadFactory);
+
+        int indexOfThisSubTask = getRuntimeContext().getIndexOfThisSubtask();
+        consumer = new DefaultMQPullConsumer(group, RocketMQConfig.buildAclRPCHook(props));
+        RocketMQConfig.buildConsumerConfigs(props, consumer);
+
+        // set unique instance name, avoid exception:
+        // https://help.aliyun.com/document_detail/29646.html
+        String runtimeName = ManagementFactory.getRuntimeMXBean().getName();
+        String instanceName =
+                RocketMQUtils.getInstanceName(
+                        runtimeName,
+                        topic,
+                        group,
+                        String.valueOf(indexOfThisSubTask),
+                        String.valueOf(System.nanoTime()));
+        consumer.setInstanceName(instanceName);
+        consumer.start();
+
+        Counter outputCounter =
+                getRuntimeContext()
+                        .getMetricGroup()
+                        .counter(MetricUtils.METRICS_TPS + "_counter", new SimpleCounter());
+        tpsMetric =
+                getRuntimeContext()
+                        .getMetricGroup()
+                        .meter(MetricUtils.METRICS_TPS, new MeterView(outputCounter, 60));
+    }
+
+    @Override
+    public void run(SourceContext context) throws Exception {
+        String tag =
+                props.getProperty(RocketMQConfig.CONSUMER_TAG, RocketMQConfig.DEFAULT_CONSUMER_TAG);
+        int pullBatchSize = getInteger(props, CONSUMER_BATCH_SIZE, DEFAULT_CONSUMER_BATCH_SIZE);
+
+        final RuntimeContext ctx = getRuntimeContext();
+        // The lock that guarantees that record emission and state updates are atomic,
+        // from the view of taking a checkpoint.
+        int taskNumber = ctx.getNumberOfParallelSubtasks();
+        int taskIndex = ctx.getIndexOfThisSubtask();
+        log.info("Source run, NumberOfTotalTask={}, IndexOfThisSubTask={}", taskNumber, taskIndex);
+
+        timer.scheduleAtFixedRate(
+                () -> {
+                    // context.emitWatermark(waterMarkPerQueue.getCurrentWatermark());
+                    context.emitWatermark(waterMarkForAll.getCurrentWatermark());
+                },
+                5,
+                5,
+                TimeUnit.SECONDS);
+
+        Collection<MessageQueue> totalQueues = consumer.fetchSubscribeMessageQueues(topic);
+        messageQueues =
+                RocketMQUtils.allocate(totalQueues, taskNumber, ctx.getIndexOfThisSubtask());
+        for (MessageQueue mq : messageQueues) {
+            this.executor.execute(
+                    () -> {
+                        RetryUtil.call(
+                                () -> {
+                                    while (runningChecker.isRunning()) {
+                                        try {
+                                            long offset = getMessageQueueOffset(mq);
+                                            PullResult pullResult =
+                                                    consumer.pullBlockIfNotFound(
+                                                            mq, tag, offset, pullBatchSize);
+
+                                            boolean found = false;
+                                            switch (pullResult.getPullStatus()) {
+                                                case FOUND:
+                                                    List<MessageExt> messages =
+                                                            pullResult.getMsgFoundList();
+                                                    for (MessageExt msg : messages) {
+                                                        byte[] key =
+                                                                msg.getKeys() != null
+                                                                        ? msg.getKeys()
+                                                                                .getBytes(
+                                                                                        StandardCharsets
+                                                                                                .UTF_8)
+                                                                        : null;
+                                                        byte[] value = msg.getBody();
+                                                        OUT data =
+                                                                schema.deserializeKeyAndValue(
+                                                                        key, value);
+
+                                                        // output and state update are atomic
+                                                        synchronized (checkPointLock) {
+                                                            log.debug(
+                                                                    msg.getMsgId()
+                                                                            + "_"
+                                                                            + msg.getBrokerName()
+                                                                            + " "
+                                                                            + msg.getQueueId()
+                                                                            + " "
+                                                                            + msg.getQueueOffset());
+                                                            context.collectWithTimestamp(
+                                                                    data, msg.getBornTimestamp());
+
+                                                            // update max eventTime per queue
+                                                            // waterMarkPerQueue.extractTimestamp(mq, msg.getBornTimestamp());
+                                                            waterMarkForAll.extractTimestamp(
+                                                                    msg.getBornTimestamp());
+                                                            tpsMetric.markEvent();
+                                                        }
+                                                    }
+                                                    found = true;
+                                                    break;
+                                                case NO_MATCHED_MSG:
+                                                    log.debug(
+                                                            "No matched message after offset {} for queue {}",
+                                                            offset,
+                                                            mq);
+                                                    break;
+                                                case NO_NEW_MSG:
+                                                    log.debug(
+                                                            "No new message after offset {} for queue {}",
+                                                            offset,
+                                                            mq);
+                                                    break;
+                                                case OFFSET_ILLEGAL:
+                                                    log.warn(
+                                                            "Offset {} is illegal for queue {}",
+                                                            offset,
+                                                            mq);
+                                                    break;
+                                                default:
+                                                    break;
+                                            }
+
+                                            synchronized (checkPointLock) {
+                                                updateMessageQueueOffset(
+                                                        mq, pullResult.getNextBeginOffset());
+                                            }
+
+                                            if (!found) {
+                                                RetryUtil.waitForMs(
+                                                        RocketMQConfig
+                                                                .DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND);
+                                            }
+                                        } catch (Exception e) {
+                                            throw new RuntimeException(e);
+                                        }
+                                    }
+                                    return true;
+                                },
+                                "RuntimeException");
+                    });
+        }
+
+        awaitTermination();
+    }
+
+    private void awaitTermination() throws InterruptedException {
+        while (runningChecker.isRunning()) {
+            Thread.sleep(50);
+        }
+    }
+
+    private long getMessageQueueOffset(MessageQueue mq) throws MQClientException {
+        Long offset = offsetTable.get(mq);
+        // restoredOffsets(unionOffsetStates) is the restored global union state;
+        // should only snapshot mqs that actually belong to us
+        if (offset == null) {
+            // fetchConsumeOffset from broker
+            offset = consumer.fetchConsumeOffset(mq, false);
+            if (!restored || offset < 0) {
+                String initialOffset =
+                        props.getProperty(
+                                RocketMQConfig.CONSUMER_OFFSET_RESET_TO, CONSUMER_OFFSET_LATEST);
+                switch (initialOffset) {
+                    case CONSUMER_OFFSET_EARLIEST:
+                        offset = consumer.minOffset(mq);
+                        break;
+                    case CONSUMER_OFFSET_LATEST:
+                        offset = consumer.maxOffset(mq);
+                        break;
+                    case CONSUMER_OFFSET_TIMESTAMP:
+                        offset =
+                                consumer.searchOffset(
+                                        mq,
+                                        getLong(
+                                                props,
+                                                RocketMQConfig.CONSUMER_OFFSET_FROM_TIMESTAMP,
+                                                System.currentTimeMillis()));
+                        break;
+                    default:
+                        throw new IllegalArgumentException(
+                                "Unknown value for CONSUMER_OFFSET_RESET_TO.");
+                }
+            }
+        }
+        offsetTable.put(mq, offset);
+        return offsetTable.get(mq);
+    }
+
+    private void updateMessageQueueOffset(MessageQueue mq, long offset) throws MQClientException {
+        offsetTable.put(mq, offset);
+        if (!enableCheckpoint) {
+            consumer.updateConsumeOffset(mq, offset);
+        }
+    }
+
+    @Override
+    public void cancel() {
+        log.debug("cancel ...");
+        runningChecker.setRunning(false);
+
+        if (consumer != null) {
+            consumer.shutdown();
+        }
+
+        if (offsetTable != null) {
+            offsetTable.clear();
+        }
+        if (restoredOffsets != null) {
+            restoredOffsets.clear();
+        }
+        if (pendingOffsetsToCommit != null) {
+            pendingOffsetsToCommit.clear();
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        log.debug("close ...");
+        // pretty much the same logic as cancelling
+        try {
+            cancel();
+        } finally {
+            super.close();
+        }
+    }
+
+    public void initOffsetTableFromRestoredOffsets() {
+        Preconditions.checkNotNull(restoredOffsets, "restoredOffsets can't be null");
+        restoredOffsets.forEach(
+                (mq, offset) -> {
+                    if (!offsetTable.containsKey(mq) || offsetTable.get(mq) < offset) {
+                        offsetTable.put(mq, offset);
+                    }
+                });
+        log.info("init offset table from restoredOffsets successful.", offsetTable);
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws Exception {
+        // called when a snapshot for a checkpoint is requested
+        log.info("Snapshotting state {} ...", context.getCheckpointId());
+        if (!runningChecker.isRunning()) {
+            log.info("snapshotState() called on closed source; returning null.");
+            return;
+        }
+
+        // Discovery topic Route change when snapshot
+        RetryUtil.call(
+                () -> {
+                    Collection<MessageQueue> totalQueues =
+                            consumer.fetchSubscribeMessageQueues(topic);
+                    int taskNumber = getRuntimeContext().getNumberOfParallelSubtasks();
+                    int taskIndex = getRuntimeContext().getIndexOfThisSubtask();
+                    List<MessageQueue> newQueues =
+                            RocketMQUtils.allocate(totalQueues, taskNumber, taskIndex);
+                    Collections.sort(newQueues);
+                    log.debug(taskIndex + " Topic route is same.");
+                    if (!messageQueues.equals(newQueues)) {
+                        throw new RuntimeException();
+                    }
+                    return true;
+                },
+                "RuntimeException due to topic route changed");
+
+        unionOffsetStates.clear();
+        HashMap<MessageQueue, Long> currentOffsets = new HashMap<>(offsetTable.size());
+        for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
+            unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue()));
+            currentOffsets.put(entry.getKey(), entry.getValue());
+        }
+        pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
+        log.info(
+                "Snapshotted state, last processed offsets: {}, checkpoint id: {}, timestamp: {}",
+                offsetTable,
+                context.getCheckpointId(),
+                context.getCheckpointTimestamp());
+    }
+
+    /**
+     * called every time the user-defined function is initialized, be that when the function is
+     * first initialized or be that when the function is actually recovering from an earlier
+     * checkpoint. Given this, initializeState() is not only the place where different types of
+     * state are initialized, but also where state recovery logic is included.
+     */
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws Exception {
+        log.info("initialize State ...");
+
+        this.unionOffsetStates =
+                context.getOperatorStateStore()
+                        .getUnionListState(
+                                new ListStateDescriptor<>(
+                                        OFFSETS_STATE_NAME,
+                                        TypeInformation.of(
+                                                new TypeHint<Tuple2<MessageQueue, Long>>() {})));
+        this.restored = context.isRestored();
+
+        if (restored) {
+            if (restoredOffsets == null) {
+                restoredOffsets = new ConcurrentHashMap<>();
+            }
+            for (Tuple2<MessageQueue, Long> mqOffsets : unionOffsetStates.get()) {
+                if (!restoredOffsets.containsKey(mqOffsets.f0)
+                        || restoredOffsets.get(mqOffsets.f0) < mqOffsets.f1) {
+                    restoredOffsets.put(mqOffsets.f0, mqOffsets.f1);
+                }
+            }
+            log.info(
+                    "Setting restore state in the consumer. Using the following offsets: {}",
+                    restoredOffsets);
+        } else {
+            log.info("No restore state for the consumer.");
+        }
+    }
+
+    @Override
+    public TypeInformation getProducedType() {
+        return schema.getProducedType();
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        // callback when checkpoint complete
+        if (!runningChecker.isRunning()) {
+            log.info("notifyCheckpointComplete() called on closed source; returning null.");
+            return;
+        }
+
+        final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
+        if (posInMap == -1) {
+            log.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
+            return;
+        }
+
+        Map<MessageQueue, Long> offsets =
+                (Map<MessageQueue, Long>) pendingOffsetsToCommit.remove(posInMap);
+
+        // remove older checkpoints in map
+        for (int i = 0; i < posInMap; i++) {
+            pendingOffsetsToCommit.remove(0);
+        }
+
+        if (offsets == null || offsets.size() == 0) {
+            log.debug("Checkpoint state was empty.");
+            return;
+        }
+
+        for (Map.Entry<MessageQueue, Long> entry : offsets.entrySet()) {
+            consumer.updateConsumeOffset(entry.getKey(), entry.getValue());
+        }
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/RunningChecker.java b/src/main/java/org/apache/rocketmq/flink/legacy/RunningChecker.java
new file mode 100644
index 0000000..c48361a
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RunningChecker.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy;
+
+import java.io.Serializable;
+
+public class RunningChecker implements Serializable {
+    private volatile boolean isRunning = false;
+
+    public boolean isRunning() {
+        return isRunning;
+    }
+
+    public void setRunning(boolean running) {
+        isRunning = running;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/DefaultTopicSelector.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/DefaultTopicSelector.java
new file mode 100644
index 0000000..6be5218
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/DefaultTopicSelector.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy.common.selector;
+
+public class DefaultTopicSelector<T> implements TopicSelector<T> {
+    private final String topicName;
+    private final String tagName;
+
+    public DefaultTopicSelector(final String topicName, final String tagName) {
+        this.topicName = topicName;
+        this.tagName = tagName;
+    }
+
+    public DefaultTopicSelector(final String topicName) {
+        this(topicName, "");
+    }
+
+    @Override
+    public String getTopic(T tuple) {
+        return topicName;
+    }
+
+    @Override
+    public String getTag(T tuple) {
+        return tagName;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/common/selector/SimpleTopicSelector.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/SimpleTopicSelector.java
similarity index 55%
rename from src/main/java/org/apache/rocketmq/flink/common/selector/SimpleTopicSelector.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/common/selector/SimpleTopicSelector.java
index 3ad8a03..674b5a0 100644
--- a/src/main/java/org/apache/rocketmq/flink/common/selector/SimpleTopicSelector.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/SimpleTopicSelector.java
@@ -1,31 +1,25 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.rocketmq.flink.common.selector;
-
-import java.util.Map;
+package org.apache.rocketmq.flink.legacy.common.selector;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/**
- * Uses field name to select topic and tag name from tuple.
- */
+import java.util.Map;
+
+/** Uses field name to select topic and tag name from tuple. */
 public class SimpleTopicSelector implements TopicSelector<Map> {
     private static final Logger LOG = LoggerFactory.getLogger(SimpleTopicSelector.class);
 
@@ -37,12 +31,17 @@ public class SimpleTopicSelector implements TopicSelector<Map> {
 
     /**
      * SimpleTopicSelector Constructor.
+     *
      * @param topicFieldName field name used for selecting topic
      * @param defaultTopicName default field name used for selecting topic
      * @param tagFieldName field name used for selecting tag
      * @param defaultTagName default field name used for selecting tag
      */
-    public SimpleTopicSelector(String topicFieldName, String defaultTopicName, String tagFieldName, String defaultTagName) {
+    public SimpleTopicSelector(
+            String topicFieldName,
+            String defaultTopicName,
+            String tagFieldName,
+            String defaultTagName) {
         this.topicFieldName = topicFieldName;
         this.defaultTopicName = defaultTopicName;
         this.tagFieldName = tagFieldName;
@@ -52,10 +51,13 @@ public class SimpleTopicSelector implements TopicSelector<Map> {
     @Override
     public String getTopic(Map tuple) {
         if (tuple.containsKey(topicFieldName)) {
-            Object topic =  tuple.get(topicFieldName);
+            Object topic = tuple.get(topicFieldName);
             return topic != null ? topic.toString() : defaultTopicName;
         } else {
-            LOG.warn("Field {} Not Found. Returning default topic {}", topicFieldName, defaultTopicName);
+            LOG.warn(
+                    "Field {} Not Found. Returning default topic {}",
+                    topicFieldName,
+                    defaultTopicName);
             return defaultTopicName;
         }
     }
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/TopicSelector.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/TopicSelector.java
new file mode 100644
index 0000000..581dadc
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/TopicSelector.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy.common.selector;
+
+import java.io.Serializable;
+
+public interface TopicSelector<T> extends Serializable {
+
+    String getTopic(T tuple);
+
+    String getTag(T tuple);
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/common/serialization/ForwardMessageExtDeserialization.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/ForwardMessageExtDeserialization.java
similarity index 84%
rename from src/main/java/org/apache/rocketmq/flink/common/serialization/ForwardMessageExtDeserialization.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/ForwardMessageExtDeserialization.java
index 20dd700..6c6fa74 100644
--- a/src/main/java/org/apache/rocketmq/flink/common/serialization/ForwardMessageExtDeserialization.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/ForwardMessageExtDeserialization.java
@@ -15,15 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.flink.common.serialization;
+package org.apache.rocketmq.flink.legacy.common.serialization;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.rocketmq.common.message.MessageExt;
 
-/**
- * A Forward messageExt deserialization.
- */
-public class ForwardMessageExtDeserialization implements MessageExtDeserializationScheme<MessageExt> {
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+/** A Forward messageExt deserialization. */
+public class ForwardMessageExtDeserialization
+        implements MessageExtDeserializationScheme<MessageExt> {
 
     @Override
     public MessageExt deserializeMessageExt(MessageExt messageExt) {
@@ -34,4 +34,4 @@ public class ForwardMessageExtDeserialization implements MessageExtDeserializati
     public TypeInformation<MessageExt> getProducedType() {
         return TypeInformation.of(MessageExt.class);
     }
-}
\ No newline at end of file
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/KeyValueDeserializationSchema.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/KeyValueDeserializationSchema.java
new file mode 100644
index 0000000..4cc8c61
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/KeyValueDeserializationSchema.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy.common.serialization;
+
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+
+import java.io.Serializable;
+
+public interface KeyValueDeserializationSchema<T> extends ResultTypeQueryable<T>, Serializable {
+    T deserializeKeyAndValue(byte[] key, byte[] value);
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/KeyValueSerializationSchema.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/KeyValueSerializationSchema.java
new file mode 100644
index 0000000..66b2e29
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/KeyValueSerializationSchema.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy.common.serialization;
+
+import java.io.Serializable;
+
+public interface KeyValueSerializationSchema<T> extends Serializable {
+
+    byte[] serializeKey(T tuple);
+
+    byte[] serializeValue(T tuple);
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/common/serialization/MessageExtDeserializationScheme.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/MessageExtDeserializationScheme.java
similarity index 95%
rename from src/main/java/org/apache/rocketmq/flink/common/serialization/MessageExtDeserializationScheme.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/MessageExtDeserializationScheme.java
index 4c8cf85..173823e 100644
--- a/src/main/java/org/apache/rocketmq/flink/common/serialization/MessageExtDeserializationScheme.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/MessageExtDeserializationScheme.java
@@ -15,12 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.flink.common.serialization;
+package org.apache.rocketmq.flink.legacy.common.serialization;
 
-import java.io.Serializable;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.rocketmq.common.message.MessageExt;
 
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+
+import java.io.Serializable;
+
 /**
  * The interface Message ext deserialization scheme.
  *
diff --git a/src/main/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueDeserializationSchema.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueDeserializationSchema.java
similarity index 60%
rename from src/main/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueDeserializationSchema.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueDeserializationSchema.java
index 93d5d9b..7dada93 100644
--- a/src/main/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueDeserializationSchema.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueDeserializationSchema.java
@@ -1,31 +1,25 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.rocketmq.flink.legacy.common.serialization;
 
-package org.apache.rocketmq.flink.common.serialization;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-
 public class SimpleKeyValueDeserializationSchema implements KeyValueDeserializationSchema<Map> {
     public static final String DEFAULT_KEY_FIELD = "key";
     public static final String DEFAULT_VALUE_FIELD = "value";
@@ -39,8 +33,9 @@ public class SimpleKeyValueDeserializationSchema implements KeyValueDeserializat
 
     /**
      * SimpleKeyValueDeserializationSchema Constructor.
+     *
      * @param keyField tuple field for selecting the key
-     * @param valueField  tuple field for selecting the value
+     * @param valueField tuple field for selecting the value
      */
     public SimpleKeyValueDeserializationSchema(String keyField, String valueField) {
         this.keyField = keyField;
@@ -65,4 +60,4 @@ public class SimpleKeyValueDeserializationSchema implements KeyValueDeserializat
     public TypeInformation<Map> getProducedType() {
         return TypeInformation.of(Map.class);
     }
-}
\ No newline at end of file
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueSerializationSchema.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchema.java
similarity index 60%
rename from src/main/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueSerializationSchema.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchema.java
index bbd6da3..3e92ad2 100644
--- a/src/main/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueSerializationSchema.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchema.java
@@ -1,22 +1,18 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.rocketmq.flink.common.serialization;
+package org.apache.rocketmq.flink.legacy.common.serialization;
 
 import java.nio.charset.StandardCharsets;
 import java.util.Map;
@@ -34,8 +30,9 @@ public class SimpleKeyValueSerializationSchema implements KeyValueSerializationS
 
     /**
      * SimpleKeyValueSerializationSchema Constructor.
+     *
      * @param keyField tuple field for selecting the key
-     * @param valueField  tuple field for selecting the value
+     * @param valueField tuple field for selecting the value
      */
     public SimpleKeyValueSerializationSchema(String keyField, String valueField) {
         this.keyField = keyField;
@@ -59,5 +56,4 @@ public class SimpleKeyValueSerializationSchema implements KeyValueSerializationS
         Object value = tuple.get(valueField);
         return value != null ? value.toString().getBytes(StandardCharsets.UTF_8) : null;
     }
-
 }
diff --git a/src/main/java/org/apache/rocketmq/flink/common/serialization/SimpleTupleDeserializationSchema.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleTupleDeserializationSchema.java
similarity index 71%
rename from src/main/java/org/apache/rocketmq/flink/common/serialization/SimpleTupleDeserializationSchema.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleTupleDeserializationSchema.java
index 54106ef..3bac266 100644
--- a/src/main/java/org/apache/rocketmq/flink/common/serialization/SimpleTupleDeserializationSchema.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleTupleDeserializationSchema.java
@@ -1,4 +1,4 @@
-package org.apache.rocketmq.flink.common.serialization;
+package org.apache.rocketmq.flink.legacy.common.serialization;
 
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -6,7 +6,8 @@ import org.apache.flink.api.java.tuple.Tuple2;
 
 import java.nio.charset.StandardCharsets;
 
-public class SimpleTupleDeserializationSchema implements KeyValueDeserializationSchema<Tuple2<String, String>> {
+public class SimpleTupleDeserializationSchema
+        implements KeyValueDeserializationSchema<Tuple2<String, String>> {
 
     @Override
     public Tuple2<String, String> deserializeKeyAndValue(byte[] key, byte[] value) {
@@ -17,6 +18,6 @@ public class SimpleTupleDeserializationSchema implements KeyValueDeserialization
 
     @Override
     public TypeInformation<Tuple2<String, String>> getProducedType() {
-        return TypeInformation.of(new TypeHint<Tuple2<String,String>>(){});
+        return TypeInformation.of(new TypeHint<Tuple2<String, String>>() {});
     }
 }
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/MetricUtils.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/MetricUtils.java
new file mode 100644
index 0000000..bb3baeb
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/MetricUtils.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy.common.util;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.SimpleCounter;
+
+/** RocketMQ connector metrics. */
+public class MetricUtils {
+
+    public static final String METRICS_TPS = "tps";
+
+    private static final String METRIC_GROUP_SINK = "sink";
+    private static final String METRICS_SINK_IN_TPS = "inTps";
+    private static final String METRICS_SINK_OUT_TPS = "outTps";
+    private static final String METRICS_SINK_OUT_BPS = "outBps";
+    private static final String METRICS_SINK_OUT_Latency = "outLatency";
+
+    public static Meter registerSinkInTps(RuntimeContext context) {
+        Counter parserCounter =
+                context.getMetricGroup()
+                        .addGroup(METRIC_GROUP_SINK)
+                        .counter(METRICS_SINK_IN_TPS + "_counter", new SimpleCounter());
+        return context.getMetricGroup()
+                .addGroup(METRIC_GROUP_SINK)
+                .meter(METRICS_SINK_IN_TPS, new MeterView(parserCounter, 60));
+    }
+
+    public static Meter registerOutTps(RuntimeContext context) {
+        Counter parserCounter =
+                context.getMetricGroup()
+                        .addGroup(METRIC_GROUP_SINK)
+                        .counter(METRICS_SINK_OUT_TPS + "_counter", new SimpleCounter());
+        return context.getMetricGroup()
+                .addGroup(METRIC_GROUP_SINK)
+                .meter(METRICS_SINK_OUT_TPS, new MeterView(parserCounter, 60));
+    }
+
+    public static Meter registerOutBps(RuntimeContext context) {
+        Counter bpsCounter =
+                context.getMetricGroup()
+                        .addGroup(METRIC_GROUP_SINK)
+                        .counter(METRICS_SINK_OUT_BPS + "_counter", new SimpleCounter());
+        return context.getMetricGroup()
+                .addGroup(METRIC_GROUP_SINK)
+                .meter(METRICS_SINK_OUT_BPS, new MeterView(bpsCounter, 60));
+    }
+
+    public static LatencyGauge registerOutLatency(RuntimeContext context) {
+        return context.getMetricGroup()
+                .addGroup(METRIC_GROUP_SINK)
+                .gauge(METRICS_SINK_OUT_Latency, new LatencyGauge());
+    }
+
+    public static class LatencyGauge implements Gauge<Double> {
+        private double value;
+
+        public void report(long timeDelta, long batchSize) {
+            if (batchSize != 0) {
+                this.value = (1.0 * timeDelta) / batchSize;
+            }
+        }
+
+        @Override
+        public Double getValue() {
+            return value;
+        }
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/common/util/RetryUtil.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtil.java
similarity index 59%
rename from src/main/java/org/apache/rocketmq/flink/common/util/RetryUtil.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtil.java
index 0dbd553..7ec1dca 100644
--- a/src/main/java/org/apache/rocketmq/flink/common/util/RetryUtil.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtil.java
@@ -1,22 +1,18 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.rocketmq.flink.common.util;
+package org.apache.rocketmq.flink.legacy.common.util;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,8 +26,7 @@ public class RetryUtil {
     private static final long MAX_BACKOFF = 5000;
     private static final int MAX_ATTEMPTS = 5;
 
-    private RetryUtil() {
-    }
+    private RetryUtil() {}
 
     public static void waitForMs(long sleepMs) {
         try {
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RocketMQUtils.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RocketMQUtils.java
new file mode 100644
index 0000000..94a24a1
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RocketMQUtils.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy.common.util;
+
+import org.apache.rocketmq.client.AccessChannel;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+public final class RocketMQUtils {
+
+    public static int getInteger(Properties props, String key, int defaultValue) {
+        return Integer.parseInt(props.getProperty(key, String.valueOf(defaultValue)));
+    }
+
+    public static long getLong(Properties props, String key, long defaultValue) {
+        return Long.parseLong(props.getProperty(key, String.valueOf(defaultValue)));
+    }
+
+    public static boolean getBoolean(Properties props, String key, boolean defaultValue) {
+        return Boolean.parseBoolean(props.getProperty(key, String.valueOf(defaultValue)));
+    }
+
+    public static AccessChannel getAccessChannel(
+            Properties props, String key, AccessChannel defaultValue) {
+        return AccessChannel.valueOf(props.getProperty(key, String.valueOf(defaultValue)));
+    }
+
+    public static String getInstanceName(String... args) {
+        if (null != args && args.length > 0) {
+            return String.join("_", args);
+        }
+        return ManagementFactory.getRuntimeMXBean().getName() + "_" + System.nanoTime();
+    }
+
+    /**
+     * Average Hashing queue algorithm Refer:
+     * org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely
+     */
+    public static List<MessageQueue> allocate(
+            Collection<MessageQueue> mqSet, int numberOfParallelTasks, int indexOfThisTask) {
+        ArrayList<MessageQueue> mqAll = new ArrayList<>(mqSet);
+        Collections.sort(mqAll);
+        List<MessageQueue> result = new ArrayList<>();
+        int mod = mqAll.size() % numberOfParallelTasks;
+        int averageSize =
+                mqAll.size() <= numberOfParallelTasks
+                        ? 1
+                        : (mod > 0 && indexOfThisTask < mod
+                                ? mqAll.size() / numberOfParallelTasks + 1
+                                : mqAll.size() / numberOfParallelTasks);
+        int startIndex =
+                (mod > 0 && indexOfThisTask < mod)
+                        ? indexOfThisTask * averageSize
+                        : indexOfThisTask * averageSize + mod;
+        int range = Math.min(averageSize, mqAll.size() - startIndex);
+        for (int i = 0; i < range; i++) {
+            result.add(mqAll.get((startIndex + i) % mqAll.size()));
+        }
+        return result;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/TestUtils.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/TestUtils.java
new file mode 100644
index 0000000..407aec7
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/TestUtils.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy.common.util;
+
+import java.lang.reflect.Field;
+
+public class TestUtils {
+    public static void setFieldValue(Object obj, String fieldName, Object value) {
+        try {
+            Field field = obj.getClass().getDeclaredField(fieldName);
+            field.setAccessible(true);
+            field.set(obj, value);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/common/watermark/BoundedOutOfOrdernessGenerator.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/BoundedOutOfOrdernessGenerator.java
similarity index 84%
rename from src/main/java/org/apache/rocketmq/flink/common/watermark/BoundedOutOfOrdernessGenerator.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/BoundedOutOfOrdernessGenerator.java
index 7e38f27..2b56f54 100644
--- a/src/main/java/org/apache/rocketmq/flink/common/watermark/BoundedOutOfOrdernessGenerator.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/BoundedOutOfOrdernessGenerator.java
@@ -15,11 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.flink.common.watermark;
+package org.apache.rocketmq.flink.legacy.common.watermark;
+
+import org.apache.rocketmq.common.message.MessageExt;
 
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.rocketmq.common.message.MessageExt;
 
 public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MessageExt> {
 
@@ -27,8 +28,7 @@ public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWater
 
     private long currentMaxTimestamp;
 
-    public BoundedOutOfOrdernessGenerator() {
-    }
+    public BoundedOutOfOrdernessGenerator() {}
 
     public BoundedOutOfOrdernessGenerator(long maxOutOfOrderness) {
         this.maxOutOfOrderness = maxOutOfOrderness;
@@ -49,9 +49,11 @@ public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWater
 
     @Override
     public String toString() {
-        return "BoundedOutOfOrdernessGenerator{" +
-            "maxOutOfOrderness=" + maxOutOfOrderness +
-            ", currentMaxTimestamp=" + currentMaxTimestamp +
-            '}';
+        return "BoundedOutOfOrdernessGenerator{"
+                + "maxOutOfOrderness="
+                + maxOutOfOrderness
+                + ", currentMaxTimestamp="
+                + currentMaxTimestamp
+                + '}';
     }
 }
diff --git a/src/main/java/org/apache/rocketmq/flink/common/watermark/BoundedOutOfOrdernessGeneratorPerQueue.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/BoundedOutOfOrdernessGeneratorPerQueue.java
similarity index 79%
rename from src/main/java/org/apache/rocketmq/flink/common/watermark/BoundedOutOfOrdernessGeneratorPerQueue.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/BoundedOutOfOrdernessGeneratorPerQueue.java
index e56b34c..ab49131 100644
--- a/src/main/java/org/apache/rocketmq/flink/common/watermark/BoundedOutOfOrdernessGeneratorPerQueue.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/BoundedOutOfOrdernessGeneratorPerQueue.java
@@ -15,27 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.flink.common.watermark;
+package org.apache.rocketmq.flink.legacy.common.watermark;
+
+import org.apache.rocketmq.common.message.MessageExt;
 
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
 
-import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-/**
- * 取每条队列中的最大eventTime的最小值作为当前source的watermark
- */
-public class BoundedOutOfOrdernessGeneratorPerQueue implements AssignerWithPeriodicWatermarks<MessageExt> {
+/** 取每条队列中的最大eventTime的最小值作为当前source的watermark */
+public class BoundedOutOfOrdernessGeneratorPerQueue
+        implements AssignerWithPeriodicWatermarks<MessageExt> {
 
     private Map<String, Long> maxEventTimeTable;
     private long maxOutOfOrderness = 5000L; // 5 seconds
 
-    public BoundedOutOfOrdernessGeneratorPerQueue() {
-    }
+    public BoundedOutOfOrdernessGeneratorPerQueue() {}
 
     public BoundedOutOfOrdernessGeneratorPerQueue(long maxOutOfOrderness) {
         this.maxOutOfOrderness = maxOutOfOrderness;
@@ -63,9 +60,11 @@ public class BoundedOutOfOrdernessGeneratorPerQueue implements AssignerWithPerio
 
     @Override
     public String toString() {
-        return "BoundedOutOfOrdernessGeneratorPerQueue{" +
-                "maxEventTimeTable=" + maxEventTimeTable +
-                ", maxOutOfOrderness=" + maxOutOfOrderness +
-                '}';
+        return "BoundedOutOfOrdernessGeneratorPerQueue{"
+                + "maxEventTimeTable="
+                + maxEventTimeTable
+                + ", maxOutOfOrderness="
+                + maxOutOfOrderness
+                + '}';
     }
 }
diff --git a/src/main/java/org/apache/rocketmq/flink/common/watermark/PunctuatedAssigner.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/PunctuatedAssigner.java
similarity index 64%
rename from src/main/java/org/apache/rocketmq/flink/common/watermark/PunctuatedAssigner.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/PunctuatedAssigner.java
index 354eecc..946e873 100644
--- a/src/main/java/org/apache/rocketmq/flink/common/watermark/PunctuatedAssigner.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/PunctuatedAssigner.java
@@ -15,23 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.flink.common.watermark;
+package org.apache.rocketmq.flink.legacy.common.watermark;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.flink.legacy.RocketMQConfig;
 
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.flink.RocketMQConfig;
 
 /**
- * With Punctuated Watermarks
- * To generate watermarks whenever a certain event indicates that a new watermark might be generated, use
- * AssignerWithPunctuatedWatermarks. For this class Flink will first call the extractTimestamp(...) method to assign the
- * element a timestamp, and then immediately call the checkAndGetNextWatermark(...) method on that element.
+ * With Punctuated Watermarks To generate watermarks whenever a certain event indicates that a new
+ * watermark might be generated, use AssignerWithPunctuatedWatermarks. For this class Flink will
+ * first call the extractTimestamp(...) method to assign the element a timestamp, and then
+ * immediately call the checkAndGetNextWatermark(...) method on that element.
  *
- * The checkAndGetNextWatermark(...) method is passed the timestamp that was assigned in the extractTimestamp(...)
- * method, and can decide whether it wants to generate a watermark. Whenever the checkAndGetNextWatermark(...) method
- * returns a non-null watermark, and that watermark is larger than the latest previous watermark, that new watermark
- * will be emitted.
+ * <p>The checkAndGetNextWatermark(...) method is passed the timestamp that was assigned in the
+ * extractTimestamp(...) method, and can decide whether it wants to generate a watermark. Whenever
+ * the checkAndGetNextWatermark(...) method returns a non-null watermark, and that watermark is
+ * larger than the latest previous watermark, that new watermark will be emitted.
  */
 public class PunctuatedAssigner implements AssignerWithPunctuatedWatermarks<MessageExt> {
     @Override
diff --git a/src/main/java/org/apache/rocketmq/flink/common/watermark/TimeLagWatermarkGenerator.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/TimeLagWatermarkGenerator.java
similarity index 82%
rename from src/main/java/org/apache/rocketmq/flink/common/watermark/TimeLagWatermarkGenerator.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/TimeLagWatermarkGenerator.java
index beec8f3..66cb3cd 100644
--- a/src/main/java/org/apache/rocketmq/flink/common/watermark/TimeLagWatermarkGenerator.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/TimeLagWatermarkGenerator.java
@@ -15,21 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.flink.common.watermark;
+package org.apache.rocketmq.flink.legacy.common.watermark;
+
+import org.apache.rocketmq.common.message.MessageExt;
 
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.rocketmq.common.message.MessageExt;
 
 /**
- * This generator generates watermarks that are lagging behind processing time by a certain amount. It assumes that
- * elements arrive in Flink after at most a certain time.
+ * This generator generates watermarks that are lagging behind processing time by a certain amount.
+ * It assumes that elements arrive in Flink after at most a certain time.
  */
 public class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks<MessageExt> {
     private long maxTimeLag = 5000; // 5 seconds
 
-    TimeLagWatermarkGenerator() {
-    }
+    TimeLagWatermarkGenerator() {}
 
     TimeLagWatermarkGenerator(long maxTimeLag) {
         this.maxTimeLag = maxTimeLag;
@@ -46,9 +46,8 @@ public class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks
         return new Watermark(System.currentTimeMillis() - maxTimeLag);
     }
 
-    @Override public String toString() {
-        return "TimeLagWatermarkGenerator{" +
-            "maxTimeLag=" + maxTimeLag +
-            '}';
+    @Override
+    public String toString() {
+        return "TimeLagWatermarkGenerator{" + "maxTimeLag=" + maxTimeLag + '}';
     }
 }
diff --git a/src/main/java/org/apache/rocketmq/flink/common/watermark/WaterMarkForAll.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/WaterMarkForAll.java
similarity index 83%
rename from src/main/java/org/apache/rocketmq/flink/common/watermark/WaterMarkForAll.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/WaterMarkForAll.java
index a80fb69..8fadd77 100644
--- a/src/main/java/org/apache/rocketmq/flink/common/watermark/WaterMarkForAll.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/WaterMarkForAll.java
@@ -15,14 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.flink.common.watermark;
+package org.apache.rocketmq.flink.legacy.common.watermark;
 
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.rocketmq.common.message.MessageQueue;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 public class WaterMarkForAll {
 
@@ -30,8 +25,7 @@ public class WaterMarkForAll {
 
     private long maxTimestamp = 0L;
 
-    public WaterMarkForAll() {
-    }
+    public WaterMarkForAll() {}
 
     public WaterMarkForAll(long maxOutOfOrderness) {
         this.maxOutOfOrderness = maxOutOfOrderness;
diff --git a/src/main/java/org/apache/rocketmq/flink/common/watermark/WaterMarkPerQueue.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/WaterMarkPerQueue.java
similarity index 87%
rename from src/main/java/org/apache/rocketmq/flink/common/watermark/WaterMarkPerQueue.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/WaterMarkPerQueue.java
index 2210cfb..941dec5 100644
--- a/src/main/java/org/apache/rocketmq/flink/common/watermark/WaterMarkPerQueue.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/WaterMarkPerQueue.java
@@ -15,11 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.flink.common.watermark;
+package org.apache.rocketmq.flink.legacy.common.watermark;
 
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.rocketmq.common.message.MessageQueue;
 
+import org.apache.flink.streaming.api.watermark.Watermark;
+
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -30,8 +31,7 @@ public class WaterMarkPerQueue {
 
     private long maxOutOfOrderness = 5000L; // 5 seconds
 
-    public WaterMarkPerQueue() {
-    }
+    public WaterMarkPerQueue() {}
 
     public WaterMarkPerQueue(long maxOutOfOrderness) {
         this.maxOutOfOrderness = maxOutOfOrderness;
@@ -54,9 +54,11 @@ public class WaterMarkPerQueue {
 
     @Override
     public String toString() {
-        return "WaterMarkPerQueue{" +
-                "maxEventTimeTable=" + maxEventTimeTable +
-                ", maxOutOfOrderness=" + maxOutOfOrderness +
-                '}';
+        return "WaterMarkPerQueue{"
+                + "maxEventTimeTable="
+                + maxEventTimeTable
+                + ", maxOutOfOrderness="
+                + maxOutOfOrderness
+                + '}';
     }
 }
diff --git a/src/main/java/org/apache/rocketmq/flink/example/RocketMQFlinkExample.java b/src/main/java/org/apache/rocketmq/flink/legacy/example/RocketMQFlinkExample.java
similarity index 77%
rename from src/main/java/org/apache/rocketmq/flink/example/RocketMQFlinkExample.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/example/RocketMQFlinkExample.java
index 1f24d96..b435726 100644
--- a/src/main/java/org/apache/rocketmq/flink/example/RocketMQFlinkExample.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/example/RocketMQFlinkExample.java
@@ -15,7 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.flink.example;
+package org.apache.rocketmq.flink.legacy.example;
+
+import org.apache.rocketmq.client.AccessChannel;
+import org.apache.rocketmq.flink.legacy.RocketMQConfig;
+import org.apache.rocketmq.flink.legacy.RocketMQSink;
+import org.apache.rocketmq.flink.legacy.RocketMQSource;
+import org.apache.rocketmq.flink.legacy.common.serialization.SimpleTupleDeserializationSchema;
+import org.apache.rocketmq.flink.legacy.function.SinkMapFunction;
+import org.apache.rocketmq.flink.legacy.function.SourceMapFunction;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.utils.ParameterTool;
@@ -25,29 +33,23 @@ import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.rocketmq.client.AccessChannel;
-import org.apache.rocketmq.flink.RocketMQConfig;
-import org.apache.rocketmq.flink.RocketMQSink;
-import org.apache.rocketmq.flink.RocketMQSource;
-import org.apache.rocketmq.flink.common.serialization.SimpleKeyValueDeserializationSchema;
-import org.apache.rocketmq.flink.common.serialization.SimpleTupleDeserializationSchema;
-import org.apache.rocketmq.flink.function.SinkMapFunction;
-import org.apache.rocketmq.flink.function.SourceMapFunction;
 
 import java.util.Properties;
 
-import static org.apache.rocketmq.flink.RocketMQConfig.CONSUMER_OFFSET_LATEST;
-import static org.apache.rocketmq.flink.RocketMQConfig.DEFAULT_CONSUMER_TAG;
+import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_LATEST;
+import static org.apache.rocketmq.flink.legacy.RocketMQConfig.DEFAULT_CONSUMER_TAG;
 
 public class RocketMQFlinkExample {
 
     /**
      * Source Config
+     *
      * @return properties
      */
     private static Properties getConsumerProps() {
         Properties consumerProps = new Properties();
-        consumerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR,
+        consumerProps.setProperty(
+                RocketMQConfig.NAME_SERVER_ADDR,
                 "http://${instanceId}.${region}.mq-internal.aliyuncs.com:8080");
         consumerProps.setProperty(RocketMQConfig.CONSUMER_GROUP, "${ConsumerGroup}");
         consumerProps.setProperty(RocketMQConfig.CONSUMER_TOPIC, "${SourceTopic}");
@@ -61,11 +63,13 @@ public class RocketMQFlinkExample {
 
     /**
      * Sink Config
+     *
      * @return properties
      */
     private static Properties getProducerProps() {
         Properties producerProps = new Properties();
-        producerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR,
+        producerProps.setProperty(
+                RocketMQConfig.NAME_SERVER_ADDR,
                 "http://${instanceId}.${region}.mq-internal.aliyuncs.com:8080");
         producerProps.setProperty(RocketMQConfig.PRODUCER_GROUP, "${ProducerGroup}");
         producerProps.setProperty(RocketMQConfig.ACCESS_KEY, "${AccessKey}");
@@ -100,22 +104,26 @@ public class RocketMQFlinkExample {
         // allow only one checkpoint to be in progress at the same time
         env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
         // enable externalized checkpoints which are retained after job cancellation
-        env.getCheckpointConfig().enableExternalizedCheckpoints(
-                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+        env.getCheckpointConfig()
+                .enableExternalizedCheckpoints(
+                        CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 
         Properties consumerProps = getConsumerProps();
         Properties producerProps = getProducerProps();
 
         SimpleTupleDeserializationSchema schema = new SimpleTupleDeserializationSchema();
 
-        DataStreamSource<Tuple2<String, String>> source = env.addSource(
-                new RocketMQSource<>(schema, consumerProps)).setParallelism(2);
+        DataStreamSource<Tuple2<String, String>> source =
+                env.addSource(new RocketMQSource<>(schema, consumerProps)).setParallelism(2);
 
         source.print();
         source.process(new SourceMapFunction())
                 .process(new SinkMapFunction("FLINK_SINK", "*"))
-                .addSink(new RocketMQSink(producerProps).withBatchFlushOnCheckpoint(true).withBatchSize(32)
-                        .withAsync(true))
+                .addSink(
+                        new RocketMQSink(producerProps)
+                                .withBatchFlushOnCheckpoint(true)
+                                .withBatchSize(32)
+                                .withAsync(true))
                 .setParallelism(2);
 
         env.execute("rocketmq-connect-flink");
diff --git a/src/main/java/org/apache/rocketmq/flink/example/SimpleConsumer.java b/src/main/java/org/apache/rocketmq/flink/legacy/example/SimpleConsumer.java
similarity index 72%
rename from src/main/java/org/apache/rocketmq/flink/example/SimpleConsumer.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/example/SimpleConsumer.java
index 601d37d..9e025bb 100644
--- a/src/main/java/org/apache/rocketmq/flink/example/SimpleConsumer.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/example/SimpleConsumer.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.flink.example;
+package org.apache.rocketmq.flink.legacy.example;
 
 import org.apache.rocketmq.acl.common.AclClientRPCHook;
 import org.apache.rocketmq.acl.common.SessionCredentials;
@@ -28,6 +28,7 @@ import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.remoting.RPCHook;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,7 +37,8 @@ public class SimpleConsumer {
     private static final Logger log = LoggerFactory.getLogger(SimpleConsumer.class);
 
     // Consumer config
-    private static final String NAME_SERVER_ADDR = "http://${instanceId}.${region}.mq-internal.aliyuncs.com:8080";
+    private static final String NAME_SERVER_ADDR =
+            "http://${instanceId}.${region}.mq-internal.aliyuncs.com:8080";
     private static final String GROUP = "GID_SIMPLE_CONSUMER";
     private static final String TOPIC = "SINK_TOPIC";
     private static final String TAGS = "*";
@@ -48,8 +50,9 @@ public class SimpleConsumer {
     }
 
     public static void main(String[] args) {
-        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
-                GROUP, getAclRPCHook(), new AllocateMessageQueueAveragely());
+        DefaultMQPushConsumer consumer =
+                new DefaultMQPushConsumer(
+                        GROUP, getAclRPCHook(), new AllocateMessageQueueAveragely());
         consumer.setNamesrvAddr(NAME_SERVER_ADDR);
 
         // When using aliyun products, you need to set up channels
@@ -62,13 +65,19 @@ public class SimpleConsumer {
         }
 
         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
-            for (MessageExt msg : msgs) {
-                System.out.printf("%s %s %d %s\n", msg.getMsgId(), msg.getBrokerName(), msg.getQueueId(),
-                        new String(msg.getBody()));
-            }
-            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-        });
+        consumer.registerMessageListener(
+                (MessageListenerConcurrently)
+                        (msgs, context) -> {
+                            for (MessageExt msg : msgs) {
+                                System.out.printf(
+                                        "%s %s %d %s\n",
+                                        msg.getMsgId(),
+                                        msg.getBrokerName(),
+                                        msg.getQueueId(),
+                                        new String(msg.getBody()));
+                            }
+                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                        });
 
         try {
             consumer.start();
diff --git a/src/main/java/org/apache/rocketmq/flink/example/SimpleProducer.java b/src/main/java/org/apache/rocketmq/flink/legacy/example/SimpleProducer.java
similarity index 88%
rename from src/main/java/org/apache/rocketmq/flink/example/SimpleProducer.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/example/SimpleProducer.java
index 9d7ba45..ea24f60 100644
--- a/src/main/java/org/apache/rocketmq/flink/example/SimpleProducer.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/example/SimpleProducer.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.flink.example;
+package org.apache.rocketmq.flink.legacy.example;
 
 import org.apache.rocketmq.acl.common.AclClientRPCHook;
 import org.apache.rocketmq.acl.common.SessionCredentials;
@@ -24,8 +24,8 @@ import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.flink.RocketMQSource;
 import org.apache.rocketmq.remoting.RPCHook;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,7 +36,8 @@ public class SimpleProducer {
     private static final int MESSAGE_NUM = 10000;
 
     // Producer config
-    private static final String NAME_SERVER_ADDR = "http://${instanceId}.${region}.mq-internal.aliyuncs.com:8080";
+    private static final String NAME_SERVER_ADDR =
+            "http://${instanceId}.${region}.mq-internal.aliyuncs.com:8080";
     private static final String PRODUCER_GROUP = "GID_SIMPLE_PRODUCER";
     private static final String TOPIC = "SOURCE_TOPIC";
     private static final String TAGS = "*";
@@ -49,8 +50,8 @@ public class SimpleProducer {
     }
 
     public static void main(String[] args) {
-        DefaultMQProducer producer = new DefaultMQProducer(
-                PRODUCER_GROUP, getAclRPCHook(), true, null);
+        DefaultMQProducer producer =
+                new DefaultMQProducer(PRODUCER_GROUP, getAclRPCHook(), true, null);
         producer.setNamesrvAddr(NAME_SERVER_ADDR);
 
         // When using aliyun products, you need to set up channels
@@ -68,7 +69,8 @@ public class SimpleProducer {
             try {
                 SendResult sendResult = producer.send(msg);
                 assert sendResult != null;
-                System.out.printf("send result: %s %s\n",
+                System.out.printf(
+                        "send result: %s %s\n",
                         sendResult.getMsgId(), sendResult.getMessageQueue().toString());
                 Thread.sleep(50);
             } catch (Exception e) {
diff --git a/src/main/java/org/apache/rocketmq/flink/function/SinkMapFunction.java b/src/main/java/org/apache/rocketmq/flink/legacy/function/SinkMapFunction.java
similarity index 91%
rename from src/main/java/org/apache/rocketmq/flink/function/SinkMapFunction.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/function/SinkMapFunction.java
index c3a6af5..f63b636 100644
--- a/src/main/java/org/apache/rocketmq/flink/function/SinkMapFunction.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/function/SinkMapFunction.java
@@ -15,13 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.flink.function;
+package org.apache.rocketmq.flink.legacy.function;
+
+import org.apache.rocketmq.common.message.Message;
 
-import org.apache.commons.lang.Validate;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.util.Collector;
-import org.apache.rocketmq.common.message.Message;
+
+import org.apache.commons.lang.Validate;
 
 public class SinkMapFunction extends ProcessFunction<Tuple2<String, String>, Message> {
 
@@ -29,8 +31,7 @@ public class SinkMapFunction extends ProcessFunction<Tuple2<String, String>, Mes
 
     private String tag;
 
-    public SinkMapFunction() {
-    }
+    public SinkMapFunction() {}
 
     public SinkMapFunction(String topic, String tag) {
         this.topic = topic;
@@ -38,7 +39,8 @@ public class SinkMapFunction extends ProcessFunction<Tuple2<String, String>, Mes
     }
 
     @Override
-    public void processElement(Tuple2<String, String> tuple, Context ctx, Collector<Message> out) throws Exception {
+    public void processElement(Tuple2<String, String> tuple, Context ctx, Collector<Message> out)
+            throws Exception {
         Validate.notNull(topic, "the message topic is null");
         Validate.notNull(tuple.f1.getBytes(), "the message body is null");
 
diff --git a/src/main/java/org/apache/rocketmq/flink/function/SourceMapFunction.java b/src/main/java/org/apache/rocketmq/flink/legacy/function/SourceMapFunction.java
similarity index 76%
rename from src/main/java/org/apache/rocketmq/flink/function/SourceMapFunction.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/function/SourceMapFunction.java
index 8dd07c6..a49df95 100644
--- a/src/main/java/org/apache/rocketmq/flink/function/SourceMapFunction.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/function/SourceMapFunction.java
@@ -15,16 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.flink.function;
+package org.apache.rocketmq.flink.legacy.function;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.util.Collector;
 
-public class SourceMapFunction extends ProcessFunction<Tuple2<String, String>, Tuple2<String, String>> {
+public class SourceMapFunction
+        extends ProcessFunction<Tuple2<String, String>, Tuple2<String, String>> {
 
     @Override
-    public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception {
+    public void processElement(
+            Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, String>> out)
+            throws Exception {
         out.collect(new Tuple2<>(value.f0, value.f1));
     }
 }
diff --git a/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java
new file mode 100644
index 0000000..b899618
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source;
+
+import org.apache.rocketmq.flink.source.enumerator.RocketMQSourceEnumState;
+import org.apache.rocketmq.flink.source.enumerator.RocketMQSourceEnumStateSerializer;
+import org.apache.rocketmq.flink.source.enumerator.RocketMQSourceEnumerator;
+import org.apache.rocketmq.flink.source.reader.RocketMQPartitionSplitReader;
+import org.apache.rocketmq.flink.source.reader.RocketMQRecordEmitter;
+import org.apache.rocketmq.flink.source.reader.RocketMQSourceReader;
+import org.apache.rocketmq.flink.source.reader.deserializer.RocketMQRecordDeserializationSchema;
+import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplit;
+import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplitSerializer;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import java.util.function.Supplier;
+
+/** The Source implementation of RocketMQ. */
+public class RocketMQSource<OUT>
+        implements Source<OUT, RocketMQPartitionSplit, RocketMQSourceEnumState>,
+                ResultTypeQueryable<OUT> {
+    private static final long serialVersionUID = -6755372893283732098L;
+
+    private final String topic;
+    private final String consumerGroup;
+    private final String tag;
+    private final long stopInMs;
+    private final long startTime;
+    private final long startOffset;
+    private final long partitionDiscoveryIntervalMs;
+
+    // Boundedness
+    private final Boundedness boundedness;
+    private final RocketMQRecordDeserializationSchema<OUT> deserializationSchema;
+
+    public RocketMQSource(
+            String topic,
+            String consumerGroup,
+            String tag,
+            long stopInMs,
+            long startTime,
+            long startOffset,
+            long partitionDiscoveryIntervalMs,
+            Boundedness boundedness,
+            RocketMQRecordDeserializationSchema<OUT> deserializationSchema) {
+        this.topic = topic;
+        this.consumerGroup = consumerGroup;
+        this.tag = tag;
+        this.stopInMs = stopInMs;
+        this.startTime = startTime;
+        this.startOffset = startOffset;
+        this.partitionDiscoveryIntervalMs = partitionDiscoveryIntervalMs;
+        this.boundedness = boundedness;
+        this.deserializationSchema = deserializationSchema;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return this.boundedness;
+    }
+
+    @Override
+    public SourceReader<OUT, RocketMQPartitionSplit> createReader(SourceReaderContext readerContext)
+            throws Exception {
+        FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple3<OUT, Long, Long>>> elementsQueue =
+                new FutureCompletingBlockingQueue<>();
+        deserializationSchema.open(
+                new DeserializationSchema.InitializationContext() {
+                    @Override
+                    public MetricGroup getMetricGroup() {
+                        return readerContext.metricGroup();
+                    }
+
+                    @Override
+                    public UserCodeClassLoader getUserCodeClassLoader() {
+                        return null;
+                    }
+                });
+
+        Supplier<SplitReader<Tuple3<OUT, Long, Long>, RocketMQPartitionSplit>> splitReaderSupplier =
+                () ->
+                        new RocketMQPartitionSplitReader<>(
+                                topic,
+                                consumerGroup,
+                                tag,
+                                stopInMs,
+                                startTime,
+                                startOffset,
+                                deserializationSchema);
+        RocketMQRecordEmitter<OUT> recordEmitter = new RocketMQRecordEmitter<>();
+
+        return new RocketMQSourceReader<>(
+                elementsQueue,
+                splitReaderSupplier,
+                recordEmitter,
+                new Configuration(),
+                readerContext);
+    }
+
+    @Override
+    public SplitEnumerator<RocketMQPartitionSplit, RocketMQSourceEnumState> createEnumerator(
+            SplitEnumeratorContext<RocketMQPartitionSplit> enumContext) {
+        return new RocketMQSourceEnumerator(
+                topic,
+                consumerGroup,
+                stopInMs,
+                startOffset,
+                partitionDiscoveryIntervalMs,
+                boundedness,
+                enumContext);
+    }
+
+    @Override
+    public SplitEnumerator<RocketMQPartitionSplit, RocketMQSourceEnumState> restoreEnumerator(
+            SplitEnumeratorContext<RocketMQPartitionSplit> enumContext,
+            RocketMQSourceEnumState checkpoint) {
+        return new RocketMQSourceEnumerator(
+                topic,
+                consumerGroup,
+                stopInMs,
+                startOffset,
+                partitionDiscoveryIntervalMs,
+                boundedness,
+                enumContext,
+                checkpoint.getCurrentAssignment());
+    }
+
+    @Override
+    public SimpleVersionedSerializer<RocketMQPartitionSplit> getSplitSerializer() {
+        return new RocketMQPartitionSplitSerializer();
+    }
+
+    @Override
+    public SimpleVersionedSerializer<RocketMQSourceEnumState> getEnumeratorCheckpointSerializer() {
+        return new RocketMQSourceEnumStateSerializer();
+    }
+
+    @Override
+    public TypeInformation<OUT> getProducedType() {
+        return deserializationSchema.getProducedType();
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/common/selector/DefaultTopicSelector.java b/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumState.java
similarity index 54%
rename from src/main/java/org/apache/rocketmq/flink/common/selector/DefaultTopicSelector.java
rename to src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumState.java
index 264d211..a23139f 100644
--- a/src/main/java/org/apache/rocketmq/flink/common/selector/DefaultTopicSelector.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumState.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,28 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.flink.common.selector;
+package org.apache.rocketmq.flink.source.enumerator;
 
-public class DefaultTopicSelector<T> implements TopicSelector<T> {
-    private final String topicName;
-    private final String tagName;
+import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplit;
 
-    public DefaultTopicSelector(final String topicName, final String tagName) {
-        this.topicName = topicName;
-        this.tagName = tagName;
-    }
+import java.util.List;
+import java.util.Map;
 
-    public DefaultTopicSelector(final String topicName) {
-        this(topicName, "");
-    }
+/** The state of RocketMQ source enumerator. */
+public class RocketMQSourceEnumState {
+
+    private final Map<Integer, List<RocketMQPartitionSplit>> currentAssignment;
 
-    @Override
-    public String getTopic(T tuple) {
-        return topicName;
+    RocketMQSourceEnumState(Map<Integer, List<RocketMQPartitionSplit>> currentAssignment) {
+        this.currentAssignment = currentAssignment;
     }
 
-    @Override
-    public String getTag(T tuple) {
-        return tagName;
+    public Map<Integer, List<RocketMQPartitionSplit>> getCurrentAssignment() {
+        return currentAssignment;
     }
 }
diff --git a/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumStateSerializer.java b/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumStateSerializer.java
new file mode 100644
index 0000000..ce45b51
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumStateSerializer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.enumerator;
+
+import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplit;
+import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplitSerializer;
+
+import org.apache.flink.connector.base.source.utils.SerdeUtils;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/** The {@link SimpleVersionedSerializer Serializer} for the enumerator state of RocketMQ source. */
+public class RocketMQSourceEnumStateSerializer
+        implements SimpleVersionedSerializer<RocketMQSourceEnumState> {
+
+    private static final int CURRENT_VERSION = 0;
+
+    @Override
+    public int getVersion() {
+        return CURRENT_VERSION;
+    }
+
+    @Override
+    public byte[] serialize(RocketMQSourceEnumState enumState) throws IOException {
+        return SerdeUtils.serializeSplitAssignments(
+                enumState.getCurrentAssignment(), new RocketMQPartitionSplitSerializer());
+    }
+
+    @Override
+    public RocketMQSourceEnumState deserialize(int version, byte[] serialized) throws IOException {
+        // Check whether the version of serialized bytes is supported.
+        if (version == getVersion()) {
+            Map<Integer, List<RocketMQPartitionSplit>> currentPartitionAssignment =
+                    SerdeUtils.deserializeSplitAssignments(
+                            serialized, new RocketMQPartitionSplitSerializer(), ArrayList::new);
+            return new RocketMQSourceEnumState(currentPartitionAssignment);
+        }
+        throw new IOException(
+                String.format(
+                        "The bytes are serialized with version %d, "
+                                + "while this deserializer only supports version up to %d",
+                        version, getVersion()));
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java b/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java
new file mode 100644
index 0000000..08290c6
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.enumerator;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.MQPullConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplit;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** The enumerator class for RocketMQ source. */
+@Internal
+public class RocketMQSourceEnumerator
+        implements SplitEnumerator<RocketMQPartitionSplit, RocketMQSourceEnumState> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RocketMQSourceEnumerator.class);
+
+    /** The topic used for this RocketMQSource. */
+    private final String topic;
+    /** The consumer group used for this RocketMQSource. */
+    private final String consumerGroup;
+    /** The stop timestamp for this RocketMQSource. */
+    private final long stopInMs;
+    /** The start offset for this RocketMQSource. */
+    private final long startOffset;
+    /** The partition discovery interval for this RocketMQSource. */
+    private final long partitionDiscoveryIntervalMs;
+    /** The boundedness of this RocketMQSource. */
+    private final Boundedness boundedness;
+
+    private final SplitEnumeratorContext<RocketMQPartitionSplit> context;
+
+    // The internal states of the enumerator.
+    /**
+     * This set is only accessed by the partition discovery callable in the callAsync() method, i.e
+     * worker thread.
+     */
+    private final Set<Tuple3<String, String, Integer>> discoveredPartitions;
+    /** The current assignment by reader id. Only accessed by the coordinator thread. */
+    private final Map<Integer, List<RocketMQPartitionSplit>> readerIdToSplitAssignments;
+    /**
+     * The discovered and initialized partition splits that are waiting for owner reader to be
+     * ready.
+     */
+    private final Map<Integer, Set<RocketMQPartitionSplit>> pendingPartitionSplitAssignment;
+
+    // Lazily instantiated or mutable fields.
+    private MQPullConsumer consumer;
+    private boolean noMoreNewPartitionSplits = false;
+
+    public RocketMQSourceEnumerator(
+            String topic,
+            String consumerGroup,
+            long stopInMs,
+            long startOffset,
+            long partitionDiscoveryIntervalMs,
+            Boundedness boundedness,
+            SplitEnumeratorContext<RocketMQPartitionSplit> context) {
+        this(
+                topic,
+                consumerGroup,
+                stopInMs,
+                startOffset,
+                partitionDiscoveryIntervalMs,
+                boundedness,
+                context,
+                new HashMap<>());
+    }
+
+    public RocketMQSourceEnumerator(
+            String topic,
+            String consumerGroup,
+            long stopInMs,
+            long startOffset,
+            long partitionDiscoveryIntervalMs,
+            Boundedness boundedness,
+            SplitEnumeratorContext<RocketMQPartitionSplit> context,
+            Map<Integer, List<RocketMQPartitionSplit>> currentSplitsAssignments) {
+        this.topic = topic;
+        this.consumerGroup = consumerGroup;
+        this.stopInMs = stopInMs;
+        this.startOffset = startOffset;
+        this.partitionDiscoveryIntervalMs = partitionDiscoveryIntervalMs;
+        this.boundedness = boundedness;
+        this.context = context;
+
+        this.discoveredPartitions = new HashSet<>();
+        this.readerIdToSplitAssignments = new HashMap<>(currentSplitsAssignments);
+        this.readerIdToSplitAssignments.forEach(
+                (reader, splits) ->
+                        splits.forEach(
+                                s ->
+                                        discoveredPartitions.add(
+                                                new Tuple3<>(
+                                                        s.getTopic(),
+                                                        s.getBroker(),
+                                                        s.getPartition()))));
+        this.pendingPartitionSplitAssignment = new HashMap<>();
+    }
+
+    @Override
+    public void start() {
+        initialRocketMQConsumer();
+        LOG.info(
+                "Starting the RocketMQSourceEnumerator for consumer group {} "
+                        + "with partition discovery interval of {} ms.",
+                consumerGroup,
+                partitionDiscoveryIntervalMs);
+        context.callAsync(
+                this::discoverAndInitializePartitionSplit,
+                this::handlePartitionSplitChanges,
+                0,
+                partitionDiscoveryIntervalMs);
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
+        // the RocketMQ source pushes splits eagerly, rather than act upon split requests
+    }
+
+    @Override
+    public void addSplitsBack(List<RocketMQPartitionSplit> splits, int subtaskId) {
+        addPartitionSplitChangeToPendingAssignments(splits);
+        assignPendingPartitionSplits();
+    }
+
+    @Override
+    public void addReader(int subtaskId) {
+        LOG.debug(
+                "Adding reader {} to RocketMQSourceEnumerator for consumer group {}.",
+                subtaskId,
+                consumerGroup);
+        assignPendingPartitionSplits();
+        if (boundedness == Boundedness.BOUNDED) {
+            // for RocketMQ bounded source, send this signal to ensure the task can end after all
+            // the
+            // splits assigned are completed.
+            context.signalNoMoreSplits(subtaskId);
+        }
+    }
+
+    @Override
+    public RocketMQSourceEnumState snapshotState() {
+        return new RocketMQSourceEnumState(readerIdToSplitAssignments);
+    }
+
+    @Override
+    public void close() {
+        if (consumer != null) {
+            consumer.shutdown();
+        }
+    }
+
+    // ----------------- private methods -------------------
+
+    private Set<RocketMQPartitionSplit> discoverAndInitializePartitionSplit()
+            throws MQClientException {
+        Set<Tuple3<String, String, Integer>> newPartitions = new HashSet<>();
+        Set<Tuple3<String, String, Integer>> removedPartitions =
+                new HashSet<>(Collections.unmodifiableSet(discoveredPartitions));
+        Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues(topic);
+        for (MessageQueue messageQueue : messageQueues) {
+            Tuple3<String, String, Integer> topicPartition =
+                    new Tuple3<>(
+                            messageQueue.getTopic(),
+                            messageQueue.getBrokerName(),
+                            messageQueue.getQueueId());
+            if (!removedPartitions.remove(topicPartition)) {
+                newPartitions.add(topicPartition);
+            }
+        }
+        discoveredPartitions.addAll(Collections.unmodifiableSet(newPartitions));
+        return newPartitions.stream()
+                .map(
+                        messageQueue ->
+                                new RocketMQPartitionSplit(
+                                        messageQueue.f0,
+                                        messageQueue.f1,
+                                        messageQueue.f2,
+                                        startOffset,
+                                        stopInMs))
+                .collect(Collectors.toSet());
+    }
+
+    // This method should only be invoked in the coordinator executor thread.
+    private void handlePartitionSplitChanges(
+            Set<RocketMQPartitionSplit> partitionSplits, Throwable t) {
+        if (t != null) {
+            throw new FlinkRuntimeException("Failed to handle partition splits change due to ", t);
+        }
+        if (partitionDiscoveryIntervalMs < 0) {
+            LOG.debug("");
+            noMoreNewPartitionSplits = true;
+        }
+        addPartitionSplitChangeToPendingAssignments(partitionSplits);
+        assignPendingPartitionSplits();
+    }
+
+    // This method should only be invoked in the coordinator executor thread.
+    private void addPartitionSplitChangeToPendingAssignments(
+            Collection<RocketMQPartitionSplit> newPartitionSplits) {
+        int numReaders = context.currentParallelism();
+        for (RocketMQPartitionSplit split : newPartitionSplits) {
+            int ownerReader =
+                    getSplitOwner(
+                            split.getTopic(), split.getBroker(), split.getPartition(), numReaders);
+            pendingPartitionSplitAssignment
+                    .computeIfAbsent(ownerReader, r -> new HashSet<>())
+                    .add(split);
+        }
+        LOG.debug(
+                "Assigned {} to {} readers of consumer group {}.",
+                newPartitionSplits,
+                numReaders,
+                consumerGroup);
+    }
+
+    // This method should only be invoked in the coordinator executor thread.
+    private void assignPendingPartitionSplits() {
+        Map<Integer, List<RocketMQPartitionSplit>> incrementalAssignment = new HashMap<>();
+        pendingPartitionSplitAssignment.forEach(
+                (ownerReader, pendingSplits) -> {
+                    if (!pendingSplits.isEmpty()
+                            && context.registeredReaders().containsKey(ownerReader)) {
+                        // The owner reader is ready, assign the split to the owner reader.
+                        incrementalAssignment
+                                .computeIfAbsent(ownerReader, r -> new ArrayList<>())
+                                .addAll(pendingSplits);
+                    }
+                });
+        if (incrementalAssignment.isEmpty()) {
+            // No assignment is made.
+            return;
+        }
+
+        LOG.info("Assigning splits to readers {}", incrementalAssignment);
+        context.assignSplits(new SplitsAssignment<>(incrementalAssignment));
+        incrementalAssignment.forEach(
+                (readerOwner, newPartitionSplits) -> {
+                    // Update the split assignment.
+                    readerIdToSplitAssignments
+                            .computeIfAbsent(readerOwner, r -> new ArrayList<>())
+                            .addAll(newPartitionSplits);
+                    // Clear the pending splits for the reader owner.
+                    pendingPartitionSplitAssignment.remove(readerOwner);
+                    // Sends NoMoreSplitsEvent to the readers if there is no more partition splits
+                    // to be assigned.
+                    if (noMoreNewPartitionSplits) {
+                        LOG.debug(
+                                "No more RocketMQPartitionSplits to assign. Sending NoMoreSplitsEvent to the readers "
+                                        + "in consumer group {}.",
+                                consumerGroup);
+                        context.signalNoMoreSplits(readerOwner);
+                    }
+                });
+    }
+
+    private void initialRocketMQConsumer() {
+        try {
+            consumer = new DefaultMQPullConsumer(consumerGroup);
+            consumer.start();
+        } catch (MQClientException e) {
+            LOG.error("Failed to initial RocketMQ consumer.", e);
+            consumer.shutdown();
+        }
+    }
+
+    /**
+     * Returns the index of the target subtask that a specific RocketMQ partition should be assigned
+     * to.
+     *
+     * <p>The resulting distribution of partitions of a single topic has the following contract:
+     *
+     * <ul>
+     *   <li>1. Uniformly distributed across subtasks
+     *   <li>2. Partitions are round-robin distributed (strictly clockwise w.r.t. ascending subtask
+     *       indices) by using the partition id as the offset from a starting index (i.e., the index
+     *       of the subtask which partition 0 of the topic will be assigned to, determined using the
+     *       topic name).
+     * </ul>
+     *
+     * @param topic the RocketMQ topic assigned.
+     * @param broker the RocketMQ broker assigned.
+     * @param partition the RocketMQ partition to assign.
+     * @param numReaders the total number of readers.
+     * @return the id of the subtask that owns the split.
+     */
+    @VisibleForTesting
+    static int getSplitOwner(String topic, String broker, int partition, int numReaders) {
+        int startIndex = (((topic + "-" + broker).hashCode() * 31) & 0x7FFFFFFF) % numReaders;
+
+        // here, the assumption is that the id of RocketMQ partitions are always ascending
+        // starting from 0, and therefore can be used directly as the offset clockwise from the
+        // start index
+        return (startIndex + partition) % numReaders;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
new file mode 100644
index 0000000..3bbeec8
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.reader;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.MQPullConsumer;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.flink.source.reader.deserializer.RocketMQRecordDeserializationSchema;
+import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplit;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.rocketmq.client.consumer.PullStatus.FOUND;
+
+/**
+ * A {@link SplitReader} implementation that reads records from RocketMQ partitions.
+ *
+ * <p>The returned type are in the format of {@code tuple3(record, offset and timestamp}.
+ */
+public class RocketMQPartitionSplitReader<T>
+        implements SplitReader<Tuple3<T, Long, Long>, RocketMQPartitionSplit> {
+    private static final Logger LOG = LoggerFactory.getLogger(RocketMQPartitionSplitReader.class);
+
+    private final String topic;
+    private final String tag;
+    private final long stopInMs;
+    private final long startTime;
+    private final long startOffset;
+
+    private final RocketMQRecordDeserializationSchema<T> deserializationSchema;
+    private final Map<Tuple3<String, String, Integer>, Long> startingOffsets;
+    private final Map<Tuple3<String, String, Integer>, Long> stoppingTimestamps;
+    private final SimpleCollector<T> collector;
+
+    private MQPullConsumer consumer;
+
+    private volatile boolean wakeup = false;
+
+    private static final int MAX_MESSAGE_NUMBER_PER_BLOCK = 64;
+
+    public RocketMQPartitionSplitReader(
+            String topic,
+            String consumerGroup,
+            String tag,
+            long stopInMs,
+            long startTime,
+            long startOffset,
+            RocketMQRecordDeserializationSchema<T> deserializationSchema) {
+        this.topic = topic;
+        this.tag = tag;
+        this.stopInMs = stopInMs;
+        this.startTime = startTime;
+        this.startOffset = startOffset;
+        this.deserializationSchema = deserializationSchema;
+        this.startingOffsets = new HashMap<>();
+        this.stoppingTimestamps = new HashMap<>();
+        this.collector = new SimpleCollector<>();
+        initialRocketMQConsumer(consumerGroup);
+    }
+
+    @Override
+    public RecordsWithSplitIds<Tuple3<T, Long, Long>> fetch() throws IOException {
+        RocketMQPartitionSplitRecords<Tuple3<T, Long, Long>> recordsBySplits =
+                new RocketMQPartitionSplitRecords<>();
+        Set<MessageQueue> messageQueues;
+        try {
+            messageQueues = consumer.fetchSubscribeMessageQueues(topic);
+        } catch (MQClientException e) {
+            LOG.error(
+                    String.format(
+                            "Fetch RocketMQ subscribe message queues of topic[%s] exception.",
+                            topic),
+                    e);
+            recordsBySplits.prepareForRead();
+            return recordsBySplits;
+        }
+        for (MessageQueue messageQueue : messageQueues) {
+            Tuple3<String, String, Integer> topicPartition =
+                    new Tuple3<>(
+                            messageQueue.getTopic(),
+                            messageQueue.getBrokerName(),
+                            messageQueue.getQueueId());
+            if (startingOffsets.containsKey(topicPartition)) {
+                long messageOffset = startingOffsets.get(topicPartition);
+                if (messageOffset == 0) {
+                    try {
+                        messageOffset =
+                                startTime > 0
+                                        ? consumer.searchOffset(messageQueue, startTime)
+                                        : startOffset;
+                    } catch (MQClientException e) {
+                        LOG.error(
+                                String.format(
+                                        "Search RocketMQ message offset of topic[%s] broker[%s] queue[%d] exception.",
+                                        messageQueue.getTopic(),
+                                        messageQueue.getBrokerName(),
+                                        messageQueue.getQueueId()),
+                                e);
+                    }
+                    messageOffset = messageOffset > -1 ? messageOffset : 0;
+                }
+                PullResult pullResult = null;
+                try {
+                    if (wakeup) {
+                        LOG.info(
+                                String.format(
+                                        "Wake up pulling messages of topic[%s] broker[%s] queue[%d] tag[%s] from offset[%d].",
+                                        messageQueue.getTopic(),
+                                        messageQueue.getBrokerName(),
+                                        messageQueue.getQueueId(),
+                                        tag,
+                                        messageOffset));
+                        wakeup = false;
+                        recordsBySplits.prepareForRead();
+                        return recordsBySplits;
+                    }
+                    pullResult =
+                            consumer.pullBlockIfNotFound(
+                                    messageQueue, tag, messageOffset, MAX_MESSAGE_NUMBER_PER_BLOCK);
+                } catch (MQClientException
+                        | RemotingException
+                        | MQBrokerException
+                        | InterruptedException e) {
+                    LOG.error(
+                            String.format(
+                                    "Pull RocketMQ messages of topic[%s] broker[%s] queue[%d] tag[%s] from offset[%d] exception.",
+                                    messageQueue.getTopic(),
+                                    messageQueue.getBrokerName(),
+                                    messageQueue.getQueueId(),
+                                    tag,
+                                    messageOffset),
+                            e);
+                }
+                startingOffsets.put(
+                        topicPartition,
+                        pullResult == null ? messageOffset : pullResult.getNextBeginOffset());
+                if (pullResult != null && pullResult.getPullStatus() == FOUND) {
+                    Collection<Tuple3<T, Long, Long>> recordsForSplit =
+                            recordsBySplits.recordsForSplit(
+                                    messageQueue.getTopic()
+                                            + "-"
+                                            + messageQueue.getBrokerName()
+                                            + "-"
+                                            + messageQueue.getQueueId());
+                    for (MessageExt messageExt : pullResult.getMsgFoundList()) {
+                        long stoppingTimestamp = getStoppingTimestamp(topicPartition);
+                        long storeTimestamp = messageExt.getStoreTimestamp();
+                        if (storeTimestamp > stoppingTimestamp) {
+                            finishSplitAtRecord(
+                                    topicPartition,
+                                    stoppingTimestamp,
+                                    messageExt.getQueueOffset(),
+                                    recordsBySplits);
+                            break;
+                        }
+                        // Add the record to the partition collector.
+                        try {
+                            deserializationSchema.deserialize(
+                                    Collections.singletonList(messageExt), collector);
+                            collector
+                                    .getRecords()
+                                    .forEach(
+                                            r ->
+                                                    recordsForSplit.add(
+                                                            new Tuple3<>(
+                                                                    r,
+                                                                    messageExt.getQueueOffset(),
+                                                                    messageExt
+                                                                            .getStoreTimestamp())));
+                        } catch (Exception e) {
+                            throw new IOException(
+                                    "Failed to deserialize consumer record due to", e);
+                        } finally {
+                            collector.reset();
+                        }
+                    }
+                }
+            }
+        }
+        recordsBySplits.prepareForRead();
+        return recordsBySplits;
+    }
+
+    @Override
+    public void handleSplitsChanges(SplitsChange<RocketMQPartitionSplit> splitsChange) {
+        // Get all the partition assignments and stopping timestamps..
+        if (!(splitsChange instanceof SplitsAddition)) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "The SplitChange type of %s is not supported.",
+                            splitsChange.getClass()));
+        }
+        // Setup the stopping timestamps.
+        splitsChange
+                .splits()
+                .forEach(
+                        split -> {
+                            Tuple3<String, String, Integer> topicPartition =
+                                    new Tuple3<>(
+                                            split.getTopic(),
+                                            split.getBroker(),
+                                            split.getPartition());
+                            startingOffsets.put(topicPartition, split.getStartingOffset());
+                            stoppingTimestamps.put(topicPartition, split.getStoppingTimestamp());
+                        });
+    }
+
+    @Override
+    public void wakeUp() {
+        LOG.debug("Wake up the split reader in case the fetcher thread is blocking in fetch().");
+        wakeup = true;
+    }
+
+    @Override
+    public void close() {
+        consumer.shutdown();
+    }
+
+    private void finishSplitAtRecord(
+            Tuple3<String, String, Integer> topicPartition,
+            long stoppingTimestamp,
+            long currentOffset,
+            RocketMQPartitionSplitRecords<Tuple3<T, Long, Long>> recordsBySplits) {
+        LOG.debug(
+                "{} has reached stopping timestamp {}, current offset is {}",
+                topicPartition.f0 + "-" + topicPartition.f1,
+                stoppingTimestamp,
+                currentOffset);
+        recordsBySplits.addFinishedSplit(RocketMQPartitionSplit.toSplitId(topicPartition));
+        startingOffsets.remove(topicPartition);
+        stoppingTimestamps.remove(topicPartition);
+    }
+
+    private long getStoppingTimestamp(Tuple3<String, String, Integer> topicPartition) {
+        return stoppingTimestamps.getOrDefault(topicPartition, stopInMs);
+    }
+
+    // --------------- private helper method ----------------------
+
+    private void initialRocketMQConsumer(String consumerGroup) {
+        try {
+            consumer = new DefaultMQPullConsumer(consumerGroup);
+            consumer.start();
+        } catch (MQClientException e) {
+            LOG.error("Failed to initial RocketMQ consumer.", e);
+            consumer.shutdown();
+        }
+    }
+
+    // ---------------- private helper class ------------------------
+
+    private static class RocketMQPartitionSplitRecords<T> implements RecordsWithSplitIds<T> {
+        private final Map<String, Collection<T>> recordsBySplits;
+        private final Set<String> finishedSplits;
+        private Iterator<Map.Entry<String, Collection<T>>> splitIterator;
+        private String currentSplitId;
+        private Iterator<T> recordIterator;
+
+        public RocketMQPartitionSplitRecords() {
+            this.recordsBySplits = new HashMap<>();
+            this.finishedSplits = new HashSet<>();
+        }
+
+        private Collection<T> recordsForSplit(String splitId) {
+            return recordsBySplits.computeIfAbsent(splitId, id -> new ArrayList<>());
+        }
+
+        private void addFinishedSplit(String splitId) {
+            finishedSplits.add(splitId);
+        }
+
+        private void prepareForRead() {
+            splitIterator = recordsBySplits.entrySet().iterator();
+        }
+
+        @Override
+        @Nullable
+        public String nextSplit() {
+            if (splitIterator.hasNext()) {
+                Map.Entry<String, Collection<T>> entry = splitIterator.next();
+                currentSplitId = entry.getKey();
+                recordIterator = entry.getValue().iterator();
+                return currentSplitId;
+            } else {
+                currentSplitId = null;
+                recordIterator = null;
+                return null;
+            }
+        }
+
+        @Override
+        @Nullable
+        public T nextRecordFromSplit() {
+            Preconditions.checkNotNull(
+                    currentSplitId,
+                    "Make sure nextSplit() did not return null before "
+                            + "iterate over the records split.");
+            if (recordIterator.hasNext()) {
+                return recordIterator.next();
+            } else {
+                return null;
+            }
+        }
+
+        @Override
+        public Set<String> finishedSplits() {
+            return finishedSplits;
+        }
+    }
+
+    private static class SimpleCollector<T> implements Collector<T> {
+        private final List<T> records = new ArrayList<>();
+
+        @Override
+        public void collect(T record) {
+            records.add(record);
+        }
+
+        @Override
+        public void close() {}
+
+        private List<T> getRecords() {
+            return records;
+        }
+
+        private void reset() {
+            records.clear();
+        }
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQRecordEmitter.java b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQRecordEmitter.java
new file mode 100644
index 0000000..25270b5
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQRecordEmitter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.reader;
+
+import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplitState;
+
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+
+/** The {@link RecordEmitter} implementation for {@link RocketMQSourceReader}. */
+public class RocketMQRecordEmitter<T>
+        implements RecordEmitter<Tuple3<T, Long, Long>, T, RocketMQPartitionSplitState> {
+
+    @Override
+    public void emitRecord(
+            Tuple3<T, Long, Long> element,
+            SourceOutput<T> output,
+            RocketMQPartitionSplitState splitState) {
+        output.collect(element.f0, element.f2);
+        splitState.setCurrentOffset(element.f1 + 1);
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQSourceReader.java b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQSourceReader.java
new file mode 100644
index 0000000..0257e34
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQSourceReader.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.reader;
+
+import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplit;
+import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplitState;
+
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+
+import java.util.Map;
+import java.util.function.Supplier;
+
+/** The source reader for RocketMQ partitions. */
+public class RocketMQSourceReader<T>
+        extends SingleThreadMultiplexSourceReaderBase<
+                Tuple3<T, Long, Long>, T, RocketMQPartitionSplit, RocketMQPartitionSplitState> {
+
+    public RocketMQSourceReader(
+            FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple3<T, Long, Long>>> elementsQueue,
+            Supplier<SplitReader<Tuple3<T, Long, Long>, RocketMQPartitionSplit>>
+                    splitReaderSupplier,
+            RecordEmitter<Tuple3<T, Long, Long>, T, RocketMQPartitionSplitState> recordEmitter,
+            Configuration config,
+            SourceReaderContext context) {
+        super(elementsQueue, splitReaderSupplier, recordEmitter, config, context);
+    }
+
+    @Override
+    protected void onSplitFinished(Map<String, RocketMQPartitionSplitState> map) {}
+
+    @Override
+    protected RocketMQPartitionSplitState initializedState(RocketMQPartitionSplit partitionSplit) {
+        return new RocketMQPartitionSplitState(partitionSplit);
+    }
+
+    @Override
+    protected RocketMQPartitionSplit toSplitType(
+            String splitId, RocketMQPartitionSplitState splitState) {
+        return splitState.toRocketMQPartitionSplit();
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRecordDeserializationSchema.java b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRecordDeserializationSchema.java
new file mode 100644
index 0000000..455f8af
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRecordDeserializationSchema.java
@@ -0,0 +1,43 @@
+package org.apache.rocketmq.flink.source.reader.deserializer;
+
+import org.apache.rocketmq.common.message.MessageExt;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema.InitializationContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/** An interface for the deserialization of RocketMQ records. */
+public interface RocketMQRecordDeserializationSchema<T>
+        extends Serializable, ResultTypeQueryable<T> {
+
+    /**
+     * Initialization method for the schema. It is called before the actual working methods {@link
+     * #deserialize} and thus suitable for one time setup work.
+     *
+     * <p>The provided {@link InitializationContext} can be used to access additional features such
+     * as e.g. registering user metrics.
+     *
+     * @param context Contextual information that can be used during initialization.
+     */
+    @PublicEvolving
+    default void open(InitializationContext context) throws Exception {}
+
+    /**
+     * Deserializes the byte message.
+     *
+     * <p>Can output multiple records through the {@link Collector}. Note that number and size of
+     * the produced records should be relatively small. Depending on the source implementation
+     * records can be buffered in memory or collecting records might delay emitting checkpoint
+     * barrier.
+     *
+     * @param record The MessageExts to deserialize.
+     * @param out The collector to put the resulting messages.
+     */
+    @PublicEvolving
+    void deserialize(List<MessageExt> record, Collector<T> out) throws IOException;
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/source/split/RocketMQPartitionSplit.java b/src/main/java/org/apache/rocketmq/flink/source/split/RocketMQPartitionSplit.java
new file mode 100644
index 0000000..9bda60f
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/source/split/RocketMQPartitionSplit.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.split;
+
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+import java.util.Objects;
+
+/** A {@link SourceSplit} for a RocketMQ partition. */
+public class RocketMQPartitionSplit implements SourceSplit {
+
+    private final String topic;
+    private final String broker;
+    private final int partition;
+    private final long startingOffset;
+    private final long stoppingTimestamp;
+
+    public RocketMQPartitionSplit(
+            String topic,
+            String broker,
+            int partition,
+            long startingOffset,
+            long stoppingTimestamp) {
+        this.topic = topic;
+        this.broker = broker;
+        this.partition = partition;
+        this.startingOffset = startingOffset;
+        this.stoppingTimestamp = stoppingTimestamp;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public String getBroker() {
+        return broker;
+    }
+
+    public int getPartition() {
+        return partition;
+    }
+
+    public long getStartingOffset() {
+        return startingOffset;
+    }
+
+    public long getStoppingTimestamp() {
+        return stoppingTimestamp;
+    }
+
+    @Override
+    public String splitId() {
+        return topic + "-" + broker + "-" + partition;
+    }
+
+    @Override
+    public String toString() {
+        return String.format(
+                "[Topic: %s, Partition: %s, StartingOffset: %d, StoppingTimestamp: %d]",
+                topic, partition, startingOffset, stoppingTimestamp);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(topic, partition, startingOffset, stoppingTimestamp);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (!(obj instanceof RocketMQPartitionSplit)) {
+            return false;
+        }
+        RocketMQPartitionSplit other = (RocketMQPartitionSplit) obj;
+        return topic.equals(other.topic)
+                && partition == other.partition
+                && startingOffset == other.startingOffset
+                && stoppingTimestamp == other.stoppingTimestamp;
+    }
+
+    public static String toSplitId(Tuple3<String, String, Integer> topicPartition) {
+        return topicPartition.f0 + "-" + topicPartition.f1 + "-" + topicPartition.f2;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/source/split/RocketMQPartitionSplitSerializer.java b/src/main/java/org/apache/rocketmq/flink/source/split/RocketMQPartitionSplitSerializer.java
new file mode 100644
index 0000000..2363257
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/source/split/RocketMQPartitionSplitSerializer.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.split;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/** The {@link SimpleVersionedSerializer serializer} for {@link RocketMQPartitionSplit}. */
+public class RocketMQPartitionSplitSerializer
+        implements SimpleVersionedSerializer<RocketMQPartitionSplit> {
+
+    private static final int CURRENT_VERSION = 0;
+
+    @Override
+    public int getVersion() {
+        return CURRENT_VERSION;
+    }
+
+    @Override
+    public byte[] serialize(RocketMQPartitionSplit split) throws IOException {
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                DataOutputStream out = new DataOutputStream(baos)) {
+            out.writeUTF(split.getTopic());
+            out.writeUTF(split.getBroker());
+            out.writeInt(split.getPartition());
+            out.writeLong(split.getStartingOffset());
+            out.writeLong(split.getStoppingTimestamp());
+            out.flush();
+            return baos.toByteArray();
+        }
+    }
+
+    @Override
+    public RocketMQPartitionSplit deserialize(int version, byte[] serialized) throws IOException {
+        try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+                DataInputStream in = new DataInputStream(bais)) {
+            String topic = in.readUTF();
+            String broker = in.readUTF();
+            int partition = in.readInt();
+            long offset = in.readLong();
+            long timestamp = in.readLong();
+            return new RocketMQPartitionSplit(topic, broker, partition, offset, timestamp);
+        }
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/source/split/RocketMQPartitionSplitState.java b/src/main/java/org/apache/rocketmq/flink/source/split/RocketMQPartitionSplitState.java
new file mode 100644
index 0000000..4fbb3da
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/source/split/RocketMQPartitionSplitState.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.split;
+
+/** This class extends RocketMQPartitionSplit to track a mutable current offset. */
+public class RocketMQPartitionSplitState extends RocketMQPartitionSplit {
+
+    private long currentOffset;
+
+    public RocketMQPartitionSplitState(RocketMQPartitionSplit partitionSplit) {
+        super(
+                partitionSplit.getTopic(),
+                partitionSplit.getBroker(),
+                partitionSplit.getPartition(),
+                partitionSplit.getStartingOffset(),
+                partitionSplit.getStoppingTimestamp());
+        this.currentOffset = partitionSplit.getStartingOffset();
+    }
+
+    public long getCurrentOffset() {
+        return currentOffset;
+    }
+
+    public void setCurrentOffset(long currentOffset) {
+        this.currentOffset = currentOffset;
+    }
+
+    /**
+     * Use the current offset as the starting offset to create a new RocketMQPartitionSplit.
+     *
+     * @return a new RocketMQPartitionSplit which uses the current offset as its starting offset.
+     */
+    public RocketMQPartitionSplit toRocketMQPartitionSplit() {
+        return new RocketMQPartitionSplit(
+                getTopic(),
+                getBroker(),
+                getPartition(),
+                getCurrentOffset(),
+                getStoppingTimestamp());
+    }
+}
diff --git a/src/test/java/org/apache/rocketmq/flink/RocketMQSinkTest.java b/src/test/java/org/apache/rocketmq/flink/RocketMQSinkTest.java
deleted file mode 100644
index 6738ec3..0000000
--- a/src/test/java/org/apache/rocketmq/flink/RocketMQSinkTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.flink;
-
-import java.util.Properties;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.flink.common.selector.DefaultTopicSelector;
-import org.apache.rocketmq.flink.common.selector.TopicSelector;
-import org.apache.rocketmq.flink.common.serialization.KeyValueSerializationSchema;
-import org.apache.rocketmq.flink.common.serialization.SimpleKeyValueSerializationSchema;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import static org.apache.rocketmq.flink.common.util.TestUtils.setFieldValue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-
-@Ignore
-public class RocketMQSinkTest {
-
-    private RocketMQSink rocketMQSink;
-    private DefaultMQProducer producer;
-
-    @Before
-    public void setUp() throws Exception {
-        KeyValueSerializationSchema serializationSchema = new SimpleKeyValueSerializationSchema("id", "name");
-        TopicSelector topicSelector = new DefaultTopicSelector("tpc");
-        Properties props = new Properties();
-        props.setProperty(RocketMQConfig.MSG_DELAY_LEVEL, String.valueOf(RocketMQConfig.MSG_DELAY_LEVEL04));
-        rocketMQSink = new RocketMQSink(props);
-
-        producer = mock(DefaultMQProducer.class);
-        setFieldValue(rocketMQSink, "producer", producer);
-    }
-
-    @Test
-    public void testSink() throws Exception {
-        Tuple2<String, String> tuple = new Tuple2<>("id", "province");
-        String topic = "testTopic";
-        String tag = "testTag";
-        Message message = new Message(topic, tag, tuple.f0, tuple.f1.getBytes());
-    }
-
-    @Test
-    public void close() throws Exception {
-        rocketMQSink.close();
-
-        verify(producer).shutdown();
-    }
-
-}
\ No newline at end of file
diff --git a/src/test/java/org/apache/rocketmq/flink/common/selector/DefaultTopicSelectorTest.java b/src/test/java/org/apache/rocketmq/flink/common/selector/DefaultTopicSelectorTest.java
deleted file mode 100644
index 2f4685c..0000000
--- a/src/test/java/org/apache/rocketmq/flink/common/selector/DefaultTopicSelectorTest.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.flink.common.selector;
-
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-public class DefaultTopicSelectorTest {
-    @Test
-    public void getTopic() throws Exception {
-        DefaultTopicSelector selector = new DefaultTopicSelector("rocket");
-        assertEquals("rocket", selector.getTopic(null));
-        assertEquals("", selector.getTag(null));
-
-        selector = new DefaultTopicSelector("rocket", "tg");
-        assertEquals("rocket", selector.getTopic(null));
-        assertEquals("tg", selector.getTag(null));
-    }
-
-}
\ No newline at end of file
diff --git a/src/test/java/org/apache/rocketmq/flink/common/selector/SimpleTopicSelectorTest.java b/src/test/java/org/apache/rocketmq/flink/common/selector/SimpleTopicSelectorTest.java
deleted file mode 100644
index 6ac1a57..0000000
--- a/src/test/java/org/apache/rocketmq/flink/common/selector/SimpleTopicSelectorTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.flink.common.selector;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-public class SimpleTopicSelectorTest {
-    @Test
-    public void getTopic() throws Exception {
-        SimpleTopicSelector selector = new SimpleTopicSelector("tpc", "dtpc", "tg", "dtg");
-        Map tuple = new HashMap();
-        tuple.put("id", "x001");
-        tuple.put("name", "vesense");
-        tuple.put("tpc", "tpc1");
-        tuple.put("tg", "tg1");
-
-        assertEquals("tpc1", selector.getTopic(tuple));
-        assertEquals("tg1", selector.getTag(tuple));
-
-        tuple = new HashMap();
-        tuple.put("id", "x001");
-        tuple.put("name", "vesense");
-
-        assertEquals("dtpc", selector.getTopic(tuple));
-        assertEquals("dtg", selector.getTag(tuple));
-    }
-
-}
\ No newline at end of file
diff --git a/src/test/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueSerializationSchemaTest.java b/src/test/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueSerializationSchemaTest.java
deleted file mode 100644
index 98aa793..0000000
--- a/src/test/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueSerializationSchemaTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.flink.common.serialization;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-public class SimpleKeyValueSerializationSchemaTest {
-    @Test
-    public void serializeKeyAndValue() throws Exception {
-        SimpleKeyValueSerializationSchema serializationSchema = new SimpleKeyValueSerializationSchema("id", "name");
-        SimpleKeyValueDeserializationSchema deserializationSchema = new SimpleKeyValueDeserializationSchema("id", "name");
-
-        Map tuple = new HashMap();
-        tuple.put("id", "x001");
-        tuple.put("name", "vesense");
-
-        assertEquals(tuple, deserializationSchema.deserializeKeyAndValue(serializationSchema.serializeKey(tuple),
-            serializationSchema.serializeValue(tuple)));
-    }
-
-}
\ No newline at end of file
diff --git a/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSinkTest.java b/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSinkTest.java
new file mode 100644
index 0000000..c45dbdf
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSinkTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy;
+
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.flink.legacy.common.selector.DefaultTopicSelector;
+import org.apache.rocketmq.flink.legacy.common.selector.TopicSelector;
+import org.apache.rocketmq.flink.legacy.common.serialization.KeyValueSerializationSchema;
+import org.apache.rocketmq.flink.legacy.common.serialization.SimpleKeyValueSerializationSchema;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.Properties;
+
+import static org.apache.rocketmq.flink.legacy.common.util.TestUtils.setFieldValue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+@Ignore
+public class RocketMQSinkTest {
+
+    private RocketMQSink rocketMQSink;
+    private DefaultMQProducer producer;
+
+    @Before
+    public void setUp() throws Exception {
+        KeyValueSerializationSchema serializationSchema =
+                new SimpleKeyValueSerializationSchema("id", "name");
+        TopicSelector topicSelector = new DefaultTopicSelector("tpc");
+        Properties props = new Properties();
+        props.setProperty(
+                RocketMQConfig.MSG_DELAY_LEVEL, String.valueOf(RocketMQConfig.MSG_DELAY_LEVEL04));
+        rocketMQSink = new RocketMQSink(props);
+
+        producer = mock(DefaultMQProducer.class);
+        setFieldValue(rocketMQSink, "producer", producer);
+    }
+
+    @Test
+    public void testSink() throws Exception {
+        Tuple2<String, String> tuple = new Tuple2<>("id", "province");
+        String topic = "testTopic";
+        String tag = "testTag";
+        Message message = new Message(topic, tag, tuple.f0, tuple.f1.getBytes());
+    }
+
+    @Test
+    public void close() throws Exception {
+        rocketMQSink.close();
+
+        verify(producer).shutdown();
+    }
+}
diff --git a/src/test/java/org/apache/rocketmq/flink/RocketMQSourceTest.java b/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSourceTest.java
similarity index 77%
rename from src/test/java/org/apache/rocketmq/flink/RocketMQSourceTest.java
rename to src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSourceTest.java
index 2f16a96..a863ddd 100644
--- a/src/test/java/org/apache/rocketmq/flink/RocketMQSourceTest.java
+++ b/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSourceTest.java
@@ -1,43 +1,42 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.rocketmq.flink.legacy;
 
-package org.apache.rocketmq.flink;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.PullStatus;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.flink.common.serialization.KeyValueDeserializationSchema;
-import org.apache.rocketmq.flink.common.serialization.SimpleKeyValueDeserializationSchema;
+import org.apache.rocketmq.flink.legacy.common.serialization.KeyValueDeserializationSchema;
+import org.apache.rocketmq.flink.legacy.common.serialization.SimpleKeyValueDeserializationSchema;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import static org.apache.rocketmq.flink.common.util.TestUtils.setFieldValue;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.rocketmq.flink.legacy.common.util.TestUtils.setFieldValue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyInt;
@@ -87,7 +86,8 @@ public class RocketMQSourceTest {
         PullResult pullResult = new PullResult(PullStatus.FOUND, 3, 1, 5, msgFoundList);
 
         when(consumer.fetchConsumeOffset(any(MessageQueue.class), anyBoolean())).thenReturn(2L);
-        when(consumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt())).thenReturn(pullResult);
+        when(consumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt()))
+                .thenReturn(pullResult);
 
         SourceContext context = mock(SourceContext.class);
         when(context.getCheckpointLock()).thenReturn(new Object());
@@ -118,4 +118,4 @@ public class RocketMQSourceTest {
             return false;
         }
     }
-}
\ No newline at end of file
+}
diff --git a/src/test/java/org/apache/rocketmq/flink/legacy/common/selector/DefaultTopicSelectorTest.java b/src/test/java/org/apache/rocketmq/flink/legacy/common/selector/DefaultTopicSelectorTest.java
new file mode 100644
index 0000000..b235c63
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/flink/legacy/common/selector/DefaultTopicSelectorTest.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy.common.selector;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class DefaultTopicSelectorTest {
+    @Test
+    public void getTopic() throws Exception {
+        DefaultTopicSelector selector = new DefaultTopicSelector("rocket");
+        assertEquals("rocket", selector.getTopic(null));
+        assertEquals("", selector.getTag(null));
+
+        selector = new DefaultTopicSelector("rocket", "tg");
+        assertEquals("rocket", selector.getTopic(null));
+        assertEquals("tg", selector.getTag(null));
+    }
+}
diff --git a/src/test/java/org/apache/rocketmq/flink/legacy/common/selector/SimpleTopicSelectorTest.java b/src/test/java/org/apache/rocketmq/flink/legacy/common/selector/SimpleTopicSelectorTest.java
new file mode 100644
index 0000000..5c0f755
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/flink/legacy/common/selector/SimpleTopicSelectorTest.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy.common.selector;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class SimpleTopicSelectorTest {
+    @Test
+    public void getTopic() throws Exception {
+        SimpleTopicSelector selector = new SimpleTopicSelector("tpc", "dtpc", "tg", "dtg");
+        Map tuple = new HashMap();
+        tuple.put("id", "x001");
+        tuple.put("name", "vesense");
+        tuple.put("tpc", "tpc1");
+        tuple.put("tg", "tg1");
+
+        assertEquals("tpc1", selector.getTopic(tuple));
+        assertEquals("tg1", selector.getTag(tuple));
+
+        tuple = new HashMap();
+        tuple.put("id", "x001");
+        tuple.put("name", "vesense");
+
+        assertEquals("dtpc", selector.getTopic(tuple));
+        assertEquals("dtg", selector.getTag(tuple));
+    }
+}
diff --git a/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchemaTest.java b/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchemaTest.java
new file mode 100644
index 0000000..78baf20
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchemaTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy.common.serialization;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class SimpleKeyValueSerializationSchemaTest {
+    @Test
+    public void serializeKeyAndValue() throws Exception {
+        SimpleKeyValueSerializationSchema serializationSchema =
+                new SimpleKeyValueSerializationSchema("id", "name");
+        SimpleKeyValueDeserializationSchema deserializationSchema =
+                new SimpleKeyValueDeserializationSchema("id", "name");
+
+        Map tuple = new HashMap();
+        tuple.put("id", "x001");
+        tuple.put("name", "vesense");
+
+        assertEquals(
+                tuple,
+                deserializationSchema.deserializeKeyAndValue(
+                        serializationSchema.serializeKey(tuple),
+                        serializationSchema.serializeValue(tuple)));
+    }
+}
diff --git a/src/test/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumStateSerializerTest.java b/src/test/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumStateSerializerTest.java
new file mode 100644
index 0000000..45ff0e3
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumStateSerializerTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.enumerator;
+
+import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplit;
+
+import org.apache.flink.api.connector.source.SplitsAssignment;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test for {@link RocketMQSourceEnumStateSerializer}. */
+public class RocketMQSourceEnumStateSerializerTest {
+
+    @Test
+    public void testSerializeDeserializeSourceEnumState() throws IOException {
+        RocketMQSourceEnumStateSerializer serializer = new RocketMQSourceEnumStateSerializer();
+        RocketMQSourceEnumState expected = prepareSourceEnumeratorState();
+        RocketMQSourceEnumState actual = serializer.deserialize(0, serializer.serialize(expected));
+        assertEquals(expected.getCurrentAssignment(), actual.getCurrentAssignment());
+    }
+
+    private RocketMQSourceEnumState prepareSourceEnumeratorState() {
+        SplitsAssignment<RocketMQPartitionSplit> pendingAssignment =
+                new SplitsAssignment<>(new HashMap<>());
+        pendingAssignment
+                .assignment()
+                .put(
+                        0,
+                        Arrays.asList(
+                                new RocketMQPartitionSplit(
+                                        "0", "taobaodaily-01", 1, 0, System.currentTimeMillis()),
+                                new RocketMQPartitionSplit(
+                                        "3", "taobaodaily-01", 2, 0, System.currentTimeMillis()),
+                                new RocketMQPartitionSplit(
+                                        "6", "taobaodaily-01", 3, 0, System.currentTimeMillis()),
+                                new RocketMQPartitionSplit(
+                                        "9", "taobaodaily-01", 4, 0, System.currentTimeMillis())));
+        pendingAssignment
+                .assignment()
+                .put(
+                        1,
+                        Arrays.asList(
+                                new RocketMQPartitionSplit(
+                                        "1", "taobaodaily-02", 5, 0, System.currentTimeMillis()),
+                                new RocketMQPartitionSplit(
+                                        "4", "taobaodaily-02", 6, 0, System.currentTimeMillis()),
+                                new RocketMQPartitionSplit(
+                                        "7", "taobaodaily-02", 7, 0, System.currentTimeMillis())));
+        pendingAssignment
+                .assignment()
+                .put(
+                        2,
+                        Arrays.asList(
+                                new RocketMQPartitionSplit(
+                                        "2", "taobaodaily-03", 8, 0, System.currentTimeMillis()),
+                                new RocketMQPartitionSplit(
+                                        "5", "taobaodaily-03", 9, 0, System.currentTimeMillis()),
+                                new RocketMQPartitionSplit(
+                                        "8", "taobaodaily-03", 10, 0, System.currentTimeMillis())));
+        return new RocketMQSourceEnumState(pendingAssignment.assignment());
+    }
+}
diff --git a/src/test/java/org/apache/rocketmq/flink/source/reader/RocketMQRecordEmitterTest.java b/src/test/java/org/apache/rocketmq/flink/source/reader/RocketMQRecordEmitterTest.java
new file mode 100644
index 0000000..83c1c4b
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/flink/source/reader/RocketMQRecordEmitterTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.reader;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplit;
+import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplitState;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test for {@link RocketMQRecordEmitter}. */
+public class RocketMQRecordEmitterTest {
+
+    @Test
+    public void testEmitRecord() {
+        RocketMQRecordEmitter<RowData> recordEmitter = new RocketMQRecordEmitter<>();
+        MessageExt message =
+                new MessageExt(
+                        1,
+                        System.currentTimeMillis(),
+                        InetSocketAddress.createUnresolved("localhost", 8080),
+                        System.currentTimeMillis(),
+                        InetSocketAddress.createUnresolved("localhost", 8088),
+                        "184019387");
+        message.setBody("test_emit_record_message".getBytes());
+        GenericRowData rowData = new GenericRowData(1);
+        rowData.setField(0, message.getBody());
+        String topic = "test-record-emitter";
+        String broker = "taobaodaily";
+        int partition = 256;
+        long startingOffset = 100;
+        long stoppingTimestamp = System.currentTimeMillis();
+        Tuple3<RowData, Long, Long> record =
+                new Tuple3<>(rowData, 100L, System.currentTimeMillis());
+        RocketMQPartitionSplitState partitionSplitState =
+                new RocketMQPartitionSplitState(
+                        new RocketMQPartitionSplit(
+                                topic, broker, partition, startingOffset, stoppingTimestamp));
+        recordEmitter.emitRecord(record, new TestingEmitterOutput<>(), partitionSplitState);
+        assertEquals(
+                new RocketMQPartitionSplit(
+                        topic, broker, partition, startingOffset + 1, stoppingTimestamp),
+                partitionSplitState.toRocketMQPartitionSplit());
+    }
+
+    private static final class TestingEmitterOutput<E> implements ReaderOutput<E> {
+
+        private TestingEmitterOutput() {}
+
+        public void collect(E record) {}
+
+        public void collect(E record, long timestamp) {
+            this.collect(record);
+        }
+
+        public void emitWatermark(Watermark watermark) {
+            throw new UnsupportedOperationException();
+        }
+
+        public void markIdle() {
+            throw new UnsupportedOperationException();
+        }
+
+        public SourceOutput<E> createOutputForSplit(String splitId) {
+            return this;
+        }
+
+        public void releaseOutputForSplit(String splitId) {}
+    }
+}
diff --git a/src/test/java/org/apache/rocketmq/flink/source/split/RocketMQPartitionSplitSerializerTest.java b/src/test/java/org/apache/rocketmq/flink/source/split/RocketMQPartitionSplitSerializerTest.java
new file mode 100644
index 0000000..b56cc9d
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/flink/source/split/RocketMQPartitionSplitSerializerTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.split;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test for {@link RocketMQPartitionSplitSerializer}. */
+public class RocketMQPartitionSplitSerializerTest {
+
+    @Test
+    public void testSerializePartitionSplit() throws IOException {
+        RocketMQPartitionSplitSerializer serializer = new RocketMQPartitionSplitSerializer();
+        RocketMQPartitionSplit expected =
+                new RocketMQPartitionSplit(
+                        "test-split-serialization",
+                        "taobaodaily",
+                        256,
+                        100,
+                        System.currentTimeMillis());
+        RocketMQPartitionSplit actual =
+                serializer.deserialize(serializer.getVersion(), serializer.serialize(expected));
+        assertEquals(expected, actual);
+    }
+}