You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/08/24 11:18:42 UTC

[rocketmq-flink] 29/33: [#715] Support the RocketMQ TableSource based on the legacy Source implementation (#779)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git

commit 4edc4daf5e7f6dca981744a6f3f963103916d397
Author: SteNicholas <pr...@163.com>
AuthorDate: Wed Aug 18 15:03:04 2021 +0800

    [#715] Support the RocketMQ TableSource based on the legacy Source implementation (#779)
---
 pom.xml                                            |   4 +-
 .../rocketmq/flink/legacy/RocketMQConfig.java      |  22 +-
 .../apache/rocketmq/flink/legacy/RocketMQSink.java |  22 +-
 ...etMQSource.java => RocketMQSourceFunction.java} |  28 +-
 .../rocketmq/flink/legacy/RunningChecker.java      |  22 +-
 .../common/selector/DefaultTopicSelector.java      |  22 +-
 .../common/selector/SimpleTopicSelector.java       |  22 +-
 .../legacy/common/selector/TopicSelector.java      |  22 +-
 .../KeyValueDeserializationSchema.java             |  22 +-
 .../serialization/KeyValueSerializationSchema.java |  22 +-
 .../RowKeyValueDeserializationSchema.java          | 407 +++++++++++++++++++++
 .../SimpleKeyValueDeserializationSchema.java       |  22 +-
 .../SimpleKeyValueSerializationSchema.java         |  22 +-
 .../flink/legacy/common/util/MetricUtils.java      |  22 +-
 .../flink/legacy/common/util/RetryUtil.java        |  22 +-
 .../flink/legacy/common/util/RocketMQUtils.java    |  22 +-
 .../flink/legacy/common/util/TestUtils.java        |  22 +-
 .../flink/legacy/example/RocketMQFlinkExample.java |   5 +-
 .../flink/source/common/RocketMQOptions.java       |   3 +
 .../deserializer/RowDeserializationSchema.java     |   2 +-
 .../table/RocketMQDynamicTableSourceFactory.java   |   6 +-
 .../source/table/RocketMQScanTableSource.java      |  65 +++-
 .../rocketmq/flink/legacy/RocketMQSinkTest.java    |  24 +-
 .../rocketmq/flink/legacy/RocketMQSourceTest.java  |  28 +-
 .../common/selector/DefaultTopicSelectorTest.java  |  24 +-
 .../common/selector/SimpleTopicSelectorTest.java   |  24 +-
 .../RowKeyValueDeserializationSchemaTest.java      |  50 +++
 .../SimpleKeyValueSerializationSchemaTest.java     |  24 +-
 .../RocketMQDynamicTableSourceFactoryTest.java     | 111 ++++++
 29 files changed, 887 insertions(+), 226 deletions(-)

diff --git a/pom.xml b/pom.xml
index abec905..0675bb7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -34,7 +34,7 @@
         <maven.compiler.source>1.8</maven.compiler.source>
         <maven.compiler.target>1.8</maven.compiler.target>
         <rocketmq.version>4.7.1</rocketmq.version>
-        <flink.version>1.13.0</flink.version>
+        <flink.version>1.13.1</flink.version>
         <commons-lang.version>2.5</commons-lang.version>
         <scala.binary.version>2.11</scala.binary.version>
         <spotless.version>2.4.2</spotless.version>
@@ -45,11 +45,13 @@
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-java</artifactId>
             <version>${flink.version}</version>
+            <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
             <version>${flink.version}</version>
+            <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQConfig.java b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQConfig.java
index 5c19b7a..fc257a1 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQConfig.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQConfig.java
@@ -1,15 +1,17 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.rocketmq.flink.legacy;
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSink.java b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSink.java
index f91a684..b6e1793 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSink.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSink.java
@@ -1,15 +1,17 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.rocketmq.flink.legacy;
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
similarity index 96%
rename from src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSource.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
index 84260b6..8821a6d 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSource.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
@@ -1,15 +1,17 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.rocketmq.flink.legacy;
@@ -82,12 +84,12 @@ import static org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils.getLong
  * guarantees when checkpoints are enabled. Otherwise, the source doesn't provide any reliability
  * guarantees.
  */
-public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
+public class RocketMQSourceFunction<OUT> extends RichParallelSourceFunction<OUT>
         implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<OUT> {
 
     private static final long serialVersionUID = 1L;
 
-    private static final Logger log = LoggerFactory.getLogger(RocketMQSource.class);
+    private static final Logger log = LoggerFactory.getLogger(RocketMQSourceFunction.class);
     private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";
     private RunningChecker runningChecker;
     private transient DefaultMQPullConsumer consumer;
@@ -115,7 +117,7 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
 
     private Meter tpsMetric;
 
-    public RocketMQSource(KeyValueDeserializationSchema<OUT> schema, Properties props) {
+    public RocketMQSourceFunction(KeyValueDeserializationSchema<OUT> schema, Properties props) {
         this.schema = schema;
         this.props = props;
     }
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/RunningChecker.java b/src/main/java/org/apache/rocketmq/flink/legacy/RunningChecker.java
index c48361a..f36b727 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/RunningChecker.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RunningChecker.java
@@ -1,15 +1,17 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.rocketmq.flink.legacy;
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/DefaultTopicSelector.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/DefaultTopicSelector.java
index 6be5218..128b19e 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/DefaultTopicSelector.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/DefaultTopicSelector.java
@@ -1,15 +1,17 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.rocketmq.flink.legacy.common.selector;
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/SimpleTopicSelector.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/SimpleTopicSelector.java
index 674b5a0..dcdaa2f 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/SimpleTopicSelector.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/SimpleTopicSelector.java
@@ -1,15 +1,17 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.rocketmq.flink.legacy.common.selector;
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/TopicSelector.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/TopicSelector.java
index 581dadc..a70c599 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/TopicSelector.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/TopicSelector.java
@@ -1,15 +1,17 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.rocketmq.flink.legacy.common.selector;
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/KeyValueDeserializationSchema.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/KeyValueDeserializationSchema.java
index 4cc8c61..8d0c778 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/KeyValueDeserializationSchema.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/KeyValueDeserializationSchema.java
@@ -1,15 +1,17 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.rocketmq.flink.legacy.common.serialization;
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/KeyValueSerializationSchema.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/KeyValueSerializationSchema.java
index 66b2e29..0000772 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/KeyValueSerializationSchema.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/KeyValueSerializationSchema.java
@@ -1,15 +1,17 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.rocketmq.flink.legacy.common.serialization;
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchema.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchema.java
new file mode 100644
index 0000000..bc43b1c
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchema.java
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy.common.serialization;
+
+import org.apache.rocketmq.flink.source.reader.deserializer.DirtyDataStrategy;
+import org.apache.rocketmq.flink.source.util.ByteSerializer;
+import org.apache.rocketmq.flink.source.util.StringSerializer;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+/**
+ * * The row based implementation of {@link KeyValueDeserializationSchema} for the deserialization
+ * of message key and value..
+ */
+public class RowKeyValueDeserializationSchema implements KeyValueDeserializationSchema<RowData> {
+
+    private static final long serialVersionUID = -1L;
+    private static final Logger logger =
+            LoggerFactory.getLogger(RowKeyValueDeserializationSchema.class);
+
+    private transient TableSchema tableSchema;
+    private final DirtyDataStrategy formatErrorStrategy;
+    private final DirtyDataStrategy fieldMissingStrategy;
+    private final DirtyDataStrategy fieldIncrementStrategy;
+    private final String encoding;
+    private final String fieldDelimiter;
+    private final boolean columnErrorDebug;
+    private final int columnSize;
+    private final ByteSerializer.ValueType[] fieldTypes;
+    private final transient DataType[] fieldDataTypes;
+    private final Map<String, Integer> columnIndexMapping;
+    private long lastLogExceptionTime;
+    private long lastLogHandleFieldTime;
+
+    private static final int DEFAULT_LOG_INTERVAL_MS = 60 * 1000;
+
+    public RowKeyValueDeserializationSchema(
+            TableSchema tableSchema,
+            DirtyDataStrategy formatErrorStrategy,
+            DirtyDataStrategy fieldMissingStrategy,
+            DirtyDataStrategy fieldIncrementStrategy,
+            String encoding,
+            String fieldDelimiter,
+            boolean columnErrorDebug,
+            Map<String, String> properties) {
+        this.tableSchema = tableSchema;
+        this.formatErrorStrategy = formatErrorStrategy;
+        this.fieldMissingStrategy = fieldMissingStrategy;
+        this.fieldIncrementStrategy = fieldIncrementStrategy;
+        this.columnErrorDebug = columnErrorDebug;
+        this.encoding = encoding;
+        this.fieldDelimiter = StringEscapeUtils.unescapeJava(fieldDelimiter);
+        this.columnSize = tableSchema.getFieldNames().length;
+        this.fieldTypes = new ByteSerializer.ValueType[columnSize];
+        this.columnIndexMapping = new HashMap<>();
+        for (int index = 0; index < columnSize; index++) {
+            this.columnIndexMapping.put(tableSchema.getFieldNames()[index], index);
+        }
+        for (int index = 0; index < columnSize; index++) {
+            ByteSerializer.ValueType type =
+                    ByteSerializer.getTypeIndex(tableSchema.getFieldTypes()[index].getTypeClass());
+            this.fieldTypes[index] = type;
+        }
+
+        DescriptorProperties descriptorProperties = new DescriptorProperties();
+        descriptorProperties.putProperties(properties);
+        this.fieldDataTypes = tableSchema.getFieldDataTypes();
+        this.lastLogExceptionTime = System.currentTimeMillis();
+        this.lastLogHandleFieldTime = System.currentTimeMillis();
+    }
+
+    @Override
+    public RowData deserializeKeyAndValue(byte[] key, byte[] value) {
+        if (isOnlyHaveVarbinaryDataField()) {
+            GenericRowData rowData = new GenericRowData(columnSize);
+            rowData.setField(0, value);
+            return rowData;
+        } else {
+            if (value == null) {
+                logger.info("Deserialize empty BytesMessage body, ignore the empty message.");
+                return null;
+            }
+            return deserializeValue(value);
+        }
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return InternalTypeInfo.of((RowType) tableSchema.toRowDataType().getLogicalType());
+    }
+
+    private boolean isOnlyHaveVarbinaryDataField() {
+        if (columnSize == 1) {
+            return isByteArrayType(tableSchema.getFieldNames()[0]);
+        }
+        return false;
+    }
+
+    private RowData deserializeValue(byte[] value) {
+        String body;
+        try {
+            body = new String(value, encoding);
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException(e);
+        }
+        String[] data = StringUtils.splitPreserveAllTokens(body, fieldDelimiter);
+        if (columnSize == 1) {
+            data = new String[1];
+            data[0] = body;
+        }
+        if (data.length < columnSize) {
+            data = handleFieldMissing(data);
+        } else if (data.length > columnSize) {
+            data = handleFieldIncrement(data);
+        }
+        if (data == null) {
+            return null;
+        }
+        GenericRowData rowData = new GenericRowData(columnSize);
+        boolean skip = false;
+        for (int index = 0; index < columnSize; index++) {
+            try {
+                String fieldValue = getValue(data, body, index);
+                rowData.setField(
+                        index,
+                        StringSerializer.deserialize(
+                                fieldValue,
+                                fieldTypes[index],
+                                fieldDataTypes[index],
+                                new HashSet<>()));
+            } catch (Exception e) {
+                skip = handleException(rowData, index, data, e);
+            }
+        }
+        if (skip) {
+            return null;
+        }
+        return rowData;
+    }
+
+    private String getValue(String[] data, String line, int index) {
+        String fieldValue = null;
+        if (columnSize == 1) {
+            fieldValue = line;
+        } else {
+            if (index < data.length) {
+                fieldValue = data[index];
+            }
+        }
+        return fieldValue;
+    }
+
+    private boolean isByteArrayType(String fieldName) {
+        TypeInformation<?> typeInformation =
+                tableSchema.getFieldTypes()[columnIndexMapping.get(fieldName)];
+        if (typeInformation != null) {
+            ByteSerializer.ValueType valueType =
+                    ByteSerializer.getTypeIndex(typeInformation.getTypeClass());
+            return valueType == ByteSerializer.ValueType.V_ByteArray;
+        }
+        return false;
+    }
+
+    private boolean handleException(GenericRowData row, int index, Object[] data, Exception e) {
+        boolean skip = false;
+        switch (formatErrorStrategy) {
+            case SKIP:
+                long now = System.currentTimeMillis();
+                if (columnErrorDebug || now - lastLogExceptionTime > DEFAULT_LOG_INTERVAL_MS) {
+                    logger.warn(
+                            "Data format error, field type: "
+                                    + fieldTypes[index]
+                                    + "field data: "
+                                    + data[index]
+                                    + ", index: "
+                                    + index
+                                    + ", data: ["
+                                    + StringUtils.join(data, ",")
+                                    + "]",
+                            e);
+                    lastLogExceptionTime = now;
+                }
+                skip = true;
+                break;
+            case SKIP_SILENT:
+                skip = true;
+                break;
+            default:
+            case CUT:
+            case NULL:
+            case PAD:
+                row.setField(index, null);
+                break;
+            case EXCEPTION:
+                throw new RuntimeException(e);
+        }
+
+        return skip;
+    }
+
+    private String[] handleFieldMissing(String[] data) {
+        String fieldMissingMessage =
+                String.format(
+                        "Field missing exception, table column number: %d, data column number: %d, data field number: %d, data: [%s].",
+                        columnSize, columnSize, data.length, StringUtils.join(data, ","));
+        switch (fieldMissingStrategy) {
+            default:
+            case SKIP:
+                long now = System.currentTimeMillis();
+                if (columnErrorDebug || now - lastLogHandleFieldTime > DEFAULT_LOG_INTERVAL_MS) {
+                    logger.warn(fieldMissingMessage);
+                    lastLogHandleFieldTime = now;
+                }
+                return null;
+            case SKIP_SILENT:
+                return null;
+            case CUT:
+            case NULL:
+            case PAD:
+                return data;
+            case EXCEPTION:
+                logger.error(fieldMissingMessage);
+                throw new RuntimeException(fieldMissingMessage);
+        }
+    }
+
+    private String[] handleFieldIncrement(String[] data) {
+        String fieldIncrementMessage =
+                String.format(
+                        "Field increment exception, table column number: %d, data column number: %d, data field number: %d, data: [%s].",
+                        columnSize, columnSize, data.length, StringUtils.join(data, ","));
+        switch (fieldIncrementStrategy) {
+            case SKIP:
+                long now = System.currentTimeMillis();
+                if (columnErrorDebug || now - lastLogHandleFieldTime > DEFAULT_LOG_INTERVAL_MS) {
+                    logger.warn(fieldIncrementMessage);
+                    lastLogHandleFieldTime = now;
+                }
+                return null;
+            case SKIP_SILENT:
+                return null;
+            default:
+            case CUT:
+            case NULL:
+            case PAD:
+                return data;
+            case EXCEPTION:
+                logger.error(fieldIncrementMessage);
+                throw new RuntimeException(fieldIncrementMessage);
+        }
+    }
+
+    /** Builder of {@link RowKeyValueDeserializationSchema}. */
+    public static class Builder {
+
+        private TableSchema schema;
+        private DirtyDataStrategy formatErrorStrategy = DirtyDataStrategy.SKIP;
+        private DirtyDataStrategy fieldMissingStrategy = DirtyDataStrategy.SKIP;
+        private DirtyDataStrategy fieldIncrementStrategy = DirtyDataStrategy.CUT;
+        private String encoding = "UTF-8";
+        private String fieldDelimiter = "\u0001";
+        private boolean columnErrorDebug = false;
+        private Map<String, String> properties;
+
+        public Builder() {}
+
+        public Builder setTableSchema(TableSchema tableSchema) {
+            this.schema = tableSchema;
+            return this;
+        }
+
+        public Builder setFormatErrorStrategy(DirtyDataStrategy formatErrorStrategy) {
+            this.formatErrorStrategy = formatErrorStrategy;
+            return this;
+        }
+
+        public Builder setFieldMissingStrategy(DirtyDataStrategy fieldMissingStrategy) {
+            this.fieldMissingStrategy = fieldMissingStrategy;
+            return this;
+        }
+
+        public Builder setFieldIncrementStrategy(DirtyDataStrategy fieldIncrementStrategy) {
+            this.fieldIncrementStrategy = fieldIncrementStrategy;
+            return this;
+        }
+
+        public Builder setEncoding(String encoding) {
+            this.encoding = encoding;
+            return this;
+        }
+
+        public Builder setFieldDelimiter(String fieldDelimiter) {
+            this.fieldDelimiter = fieldDelimiter;
+            return this;
+        }
+
+        public Builder setColumnErrorDebug(boolean columnErrorDebug) {
+            this.columnErrorDebug = columnErrorDebug;
+            return this;
+        }
+
+        public Builder setProperties(Map<String, String> properties) {
+            this.properties = properties;
+            if (null == properties) {
+                return this;
+            }
+            Configuration configuration = new Configuration();
+            for (String key : properties.keySet()) {
+                configuration.setString(key, properties.get(key));
+            }
+            String lengthCheck = configuration.get(CollectorOption.LENGTH_CHECK);
+            switch (lengthCheck.toUpperCase()) {
+                case "SKIP":
+                    {
+                        this.setFormatErrorStrategy(DirtyDataStrategy.SKIP);
+                        this.setFieldMissingStrategy(DirtyDataStrategy.SKIP);
+                        this.setFieldIncrementStrategy(DirtyDataStrategy.SKIP);
+                    }
+                    break;
+                case "PAD":
+                    {
+                        this.setFormatErrorStrategy(DirtyDataStrategy.SKIP);
+                        this.setFieldMissingStrategy(DirtyDataStrategy.PAD);
+                        this.setFieldIncrementStrategy(DirtyDataStrategy.CUT);
+                    }
+                    break;
+                case "EXCEPTION":
+                    {
+                        this.setFormatErrorStrategy(DirtyDataStrategy.EXCEPTION);
+                        this.setFieldMissingStrategy(DirtyDataStrategy.EXCEPTION);
+                        this.setFieldIncrementStrategy(DirtyDataStrategy.EXCEPTION);
+                    }
+                    break;
+                case "SKIP_SILENT":
+                    {
+                        this.setFormatErrorStrategy(DirtyDataStrategy.SKIP_SILENT);
+                        this.setFieldMissingStrategy(DirtyDataStrategy.SKIP_SILENT);
+                        this.setFieldIncrementStrategy(DirtyDataStrategy.SKIP_SILENT);
+                    }
+                    break;
+                default:
+            }
+            this.setEncoding(configuration.getString(CollectorOption.ENCODING));
+            this.setFieldDelimiter(configuration.getString(CollectorOption.FIELD_DELIMITER));
+            this.setColumnErrorDebug(configuration.getBoolean(CollectorOption.COLUMN_ERROR_DEBUG));
+            return this;
+        }
+
+        public RowKeyValueDeserializationSchema build() {
+            return new RowKeyValueDeserializationSchema(
+                    schema,
+                    formatErrorStrategy,
+                    fieldMissingStrategy,
+                    fieldIncrementStrategy,
+                    encoding,
+                    fieldDelimiter,
+                    columnErrorDebug,
+                    properties);
+        }
+    }
+
+    /** Options for {@link RowKeyValueDeserializationSchema}. */
+    public static class CollectorOption {
+        public static final ConfigOption<String> ENCODING =
+                ConfigOptions.key("encoding".toLowerCase()).defaultValue("UTF-8");
+        public static final ConfigOption<String> FIELD_DELIMITER =
+                ConfigOptions.key("fieldDelimiter".toLowerCase()).defaultValue("\u0001");
+        public static final ConfigOption<Boolean> COLUMN_ERROR_DEBUG =
+                ConfigOptions.key("columnErrorDebug".toLowerCase()).defaultValue(true);
+        public static final ConfigOption<String> LENGTH_CHECK =
+                ConfigOptions.key("lengthCheck".toLowerCase()).defaultValue("NONE");
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueDeserializationSchema.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueDeserializationSchema.java
index 7dada93..456f477 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueDeserializationSchema.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueDeserializationSchema.java
@@ -1,15 +1,17 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.rocketmq.flink.legacy.common.serialization;
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchema.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchema.java
index 3e92ad2..c3ae600 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchema.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchema.java
@@ -1,15 +1,17 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.rocketmq.flink.legacy.common.serialization;
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/MetricUtils.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/MetricUtils.java
index bb3baeb..a54a29a 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/MetricUtils.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/MetricUtils.java
@@ -1,15 +1,17 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.rocketmq.flink.legacy.common.util;
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtil.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtil.java
index 7ec1dca..e53caf1 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtil.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtil.java
@@ -1,15 +1,17 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.rocketmq.flink.legacy.common.util;
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RocketMQUtils.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RocketMQUtils.java
index 94a24a1..1f084a8 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RocketMQUtils.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RocketMQUtils.java
@@ -1,15 +1,17 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.rocketmq.flink.legacy.common.util;
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/TestUtils.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/TestUtils.java
index 407aec7..70c26d9 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/TestUtils.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/TestUtils.java
@@ -1,15 +1,17 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.rocketmq.flink.legacy.common.util;
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/example/RocketMQFlinkExample.java b/src/main/java/org/apache/rocketmq/flink/legacy/example/RocketMQFlinkExample.java
index b435726..fc0d3cb 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/example/RocketMQFlinkExample.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/example/RocketMQFlinkExample.java
@@ -20,7 +20,7 @@ package org.apache.rocketmq.flink.legacy.example;
 import org.apache.rocketmq.client.AccessChannel;
 import org.apache.rocketmq.flink.legacy.RocketMQConfig;
 import org.apache.rocketmq.flink.legacy.RocketMQSink;
-import org.apache.rocketmq.flink.legacy.RocketMQSource;
+import org.apache.rocketmq.flink.legacy.RocketMQSourceFunction;
 import org.apache.rocketmq.flink.legacy.common.serialization.SimpleTupleDeserializationSchema;
 import org.apache.rocketmq.flink.legacy.function.SinkMapFunction;
 import org.apache.rocketmq.flink.legacy.function.SourceMapFunction;
@@ -114,7 +114,8 @@ public class RocketMQFlinkExample {
         SimpleTupleDeserializationSchema schema = new SimpleTupleDeserializationSchema();
 
         DataStreamSource<Tuple2<String, String>> source =
-                env.addSource(new RocketMQSource<>(schema, consumerProps)).setParallelism(2);
+                env.addSource(new RocketMQSourceFunction<>(schema, consumerProps))
+                        .setParallelism(2);
 
         source.print();
         source.process(new SourceMapFunction())
diff --git a/src/main/java/org/apache/rocketmq/flink/source/common/RocketMQOptions.java b/src/main/java/org/apache/rocketmq/flink/source/common/RocketMQOptions.java
index 064e193..000e090 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/common/RocketMQOptions.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/common/RocketMQOptions.java
@@ -53,6 +53,9 @@ public class RocketMQOptions {
     public static final ConfigOption<Long> OPTIONAL_PARTITION_DISCOVERY_INTERVAL_MS =
             ConfigOptions.key("partitionDiscoveryIntervalMs").longType().defaultValue(30000L);
 
+    public static final ConfigOption<Boolean> OPTIONAL_USE_NEW_API =
+            ConfigOptions.key("useNewApi").booleanType().defaultValue(true);
+
     public static final ConfigOption<String> OPTIONAL_ENCODING =
             ConfigOptions.key("encoding").stringType().defaultValue("UTF-8");
 
diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RowDeserializationSchema.java b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RowDeserializationSchema.java
index f106693..b946016 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RowDeserializationSchema.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RowDeserializationSchema.java
@@ -109,7 +109,7 @@ public class RowDeserializationSchema
         this.fieldTypes = new ValueType[totalColumnSize];
         this.columnIndexMapping = new HashMap<>();
         this.dataIndexMapping = new HashMap<>();
-        for (int index = 0; index < tableSchema.getFieldNames().length; index++) {
+        for (int index = 0; index < totalColumnSize; index++) {
             this.columnIndexMapping.put(tableSchema.getFieldNames()[index], index);
         }
         for (int index = 0; index < totalColumnSize; index++) {
diff --git a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
index ec41fc6..990e28b 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
@@ -56,6 +56,7 @@ import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_S
 import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_START_TIME_MILLS;
 import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_TAG;
 import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_TIME_ZONE;
+import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_USE_NEW_API;
 import static org.apache.rocketmq.flink.source.common.RocketMQOptions.TOPIC;
 
 /**
@@ -90,6 +91,7 @@ public class RocketMQDynamicTableSourceFactory implements DynamicTableSourceFact
         optionalOptions.add(OPTIONAL_END_TIME);
         optionalOptions.add(OPTIONAL_TIME_ZONE);
         optionalOptions.add(OPTIONAL_PARTITION_DISCOVERY_INTERVAL_MS);
+        optionalOptions.add(OPTIONAL_USE_NEW_API);
         optionalOptions.add(OPTIONAL_ENCODING);
         optionalOptions.add(OPTIONAL_FIELD_DELIMITER);
         optionalOptions.add(OPTIONAL_LINE_DELIMITER);
@@ -146,6 +148,7 @@ public class RocketMQDynamicTableSourceFactory implements DynamicTableSourceFact
         }
         long partitionDiscoveryIntervalMs =
                 configuration.getLong(OPTIONAL_PARTITION_DISCOVERY_INTERVAL_MS);
+        boolean useNewApi = configuration.getBoolean(OPTIONAL_USE_NEW_API);
         DescriptorProperties descriptorProperties = new DescriptorProperties();
         descriptorProperties.putProperties(rawProperties);
         TableSchema physicalSchema =
@@ -161,7 +164,8 @@ public class RocketMQDynamicTableSourceFactory implements DynamicTableSourceFact
                 stopInMs,
                 startMessageOffset,
                 startMessageOffset < 0 ? startTime : -1L,
-                partitionDiscoveryIntervalMs);
+                partitionDiscoveryIntervalMs,
+                useNewApi);
     }
 
     private void transformContext(
diff --git a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java
index 37ab6a5..2a0d28a 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java
@@ -17,6 +17,10 @@
 
 package org.apache.rocketmq.flink.source.table;
 
+import org.apache.rocketmq.flink.legacy.RocketMQConfig;
+import org.apache.rocketmq.flink.legacy.RocketMQSourceFunction;
+import org.apache.rocketmq.flink.legacy.common.serialization.KeyValueDeserializationSchema;
+import org.apache.rocketmq.flink.legacy.common.serialization.RowKeyValueDeserializationSchema;
 import org.apache.rocketmq.flink.source.RocketMQSource;
 import org.apache.rocketmq.flink.source.reader.deserializer.BytesMessage;
 import org.apache.rocketmq.flink.source.reader.deserializer.RocketMQDeserializationSchema;
@@ -28,6 +32,7 @@ import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
 import org.apache.flink.table.connector.source.SourceProvider;
 import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
 import org.apache.flink.table.data.RowData;
@@ -39,6 +44,7 @@ import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.stream.Stream;
 
 import static org.apache.flink.api.connector.source.Boundedness.BOUNDED;
@@ -59,6 +65,7 @@ public class RocketMQScanTableSource implements ScanTableSource, SupportsReading
     private final long partitionDiscoveryIntervalMs;
     private final long startMessageOffset;
     private final long startTime;
+    private final boolean useNewApi;
 
     private List<String> metadataKeys;
 
@@ -72,7 +79,8 @@ public class RocketMQScanTableSource implements ScanTableSource, SupportsReading
             long stopInMs,
             long startMessageOffset,
             long startTime,
-            long partitionDiscoveryIntervalMs) {
+            long partitionDiscoveryIntervalMs,
+            boolean useNewApi) {
         this.properties = properties;
         this.schema = schema;
         this.topic = topic;
@@ -83,6 +91,7 @@ public class RocketMQScanTableSource implements ScanTableSource, SupportsReading
         this.startMessageOffset = startMessageOffset;
         this.startTime = startTime;
         this.partitionDiscoveryIntervalMs = partitionDiscoveryIntervalMs;
+        this.useNewApi = useNewApi;
         this.metadataKeys = Collections.emptyList();
     }
 
@@ -93,18 +102,25 @@ public class RocketMQScanTableSource implements ScanTableSource, SupportsReading
 
     @Override
     public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
-        return SourceProvider.of(
-                new RocketMQSource<>(
-                        topic,
-                        consumerGroup,
-                        nameServerAddress,
-                        tag,
-                        stopInMs,
-                        startTime,
-                        startMessageOffset < 0 ? 0 : startMessageOffset,
-                        partitionDiscoveryIntervalMs,
-                        isBounded() ? BOUNDED : CONTINUOUS_UNBOUNDED,
-                        createDeserializationSchema()));
+        if (useNewApi) {
+            return SourceProvider.of(
+                    new RocketMQSource<>(
+                            topic,
+                            consumerGroup,
+                            nameServerAddress,
+                            tag,
+                            stopInMs,
+                            startTime,
+                            startMessageOffset < 0 ? 0 : startMessageOffset,
+                            partitionDiscoveryIntervalMs,
+                            isBounded() ? BOUNDED : CONTINUOUS_UNBOUNDED,
+                            createRocketMQDeserializationSchema()));
+        } else {
+            return SourceFunctionProvider.of(
+                    new RocketMQSourceFunction<>(
+                            createKeyValueDeserializationSchema(), getConsumerProps()),
+                    isBounded());
+        }
     }
 
     @Override
@@ -133,17 +149,18 @@ public class RocketMQScanTableSource implements ScanTableSource, SupportsReading
                         stopInMs,
                         startMessageOffset,
                         startTime,
-                        partitionDiscoveryIntervalMs);
+                        partitionDiscoveryIntervalMs,
+                        useNewApi);
         tableSource.metadataKeys = metadataKeys;
         return tableSource;
     }
 
     @Override
     public String asSummaryString() {
-        return "RocketMQScanTableSource";
+        return RocketMQScanTableSource.class.getName();
     }
 
-    private RocketMQDeserializationSchema<RowData> createDeserializationSchema() {
+    private RocketMQDeserializationSchema<RowData> createRocketMQDeserializationSchema() {
         final MetadataConverter[] metadataConverters =
                 metadataKeys.stream()
                         .map(
@@ -162,6 +179,22 @@ public class RocketMQScanTableSource implements ScanTableSource, SupportsReading
         return stopInMs != Long.MAX_VALUE;
     }
 
+    private KeyValueDeserializationSchema<RowData> createKeyValueDeserializationSchema() {
+        return new RowKeyValueDeserializationSchema.Builder()
+                .setProperties(properties.asMap())
+                .setTableSchema(schema)
+                .build();
+    }
+
+    private Properties getConsumerProps() {
+        Properties consumerProps = new Properties();
+        consumerProps.setProperty(RocketMQConfig.CONSUMER_TOPIC, topic);
+        consumerProps.setProperty(RocketMQConfig.CONSUMER_GROUP, consumerGroup);
+        consumerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameServerAddress);
+        consumerProps.setProperty(RocketMQConfig.CONSUMER_TAG, tag);
+        return consumerProps;
+    }
+
     // --------------------------------------------------------------------------------------------
     // Metadata handling
     // --------------------------------------------------------------------------------------------
diff --git a/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSinkTest.java b/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSinkTest.java
index c45dbdf..ad3c0b1 100644
--- a/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSinkTest.java
+++ b/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSinkTest.java
@@ -1,17 +1,21 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.rocketmq.flink.legacy;
 
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
diff --git a/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSourceTest.java b/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSourceTest.java
index a863ddd..7ce124d 100644
--- a/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSourceTest.java
+++ b/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSourceTest.java
@@ -1,17 +1,21 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.rocketmq.flink.legacy;
 
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
@@ -50,7 +54,7 @@ import static org.mockito.Mockito.when;
 @Ignore
 public class RocketMQSourceTest {
 
-    private RocketMQSource rocketMQSource;
+    private RocketMQSourceFunction rocketMQSource;
     private MQPullConsumerScheduleService pullConsumerScheduleService;
     private DefaultMQPullConsumer consumer;
     private KeyValueDeserializationSchema deserializationSchema;
@@ -60,7 +64,7 @@ public class RocketMQSourceTest {
     public void setUp() throws Exception {
         deserializationSchema = new SimpleKeyValueDeserializationSchema();
         Properties props = new Properties();
-        rocketMQSource = new RocketMQSource(deserializationSchema, props);
+        rocketMQSource = new RocketMQSourceFunction(deserializationSchema, props);
 
         setFieldValue(rocketMQSource, "topic", topic);
         setFieldValue(rocketMQSource, "runningChecker", new SingleRunningCheck());
diff --git a/src/test/java/org/apache/rocketmq/flink/legacy/common/selector/DefaultTopicSelectorTest.java b/src/test/java/org/apache/rocketmq/flink/legacy/common/selector/DefaultTopicSelectorTest.java
index b235c63..aa1528a 100644
--- a/src/test/java/org/apache/rocketmq/flink/legacy/common/selector/DefaultTopicSelectorTest.java
+++ b/src/test/java/org/apache/rocketmq/flink/legacy/common/selector/DefaultTopicSelectorTest.java
@@ -1,17 +1,21 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.rocketmq.flink.legacy.common.selector;
 
 import org.junit.Test;
diff --git a/src/test/java/org/apache/rocketmq/flink/legacy/common/selector/SimpleTopicSelectorTest.java b/src/test/java/org/apache/rocketmq/flink/legacy/common/selector/SimpleTopicSelectorTest.java
index 5c0f755..dc93d14 100644
--- a/src/test/java/org/apache/rocketmq/flink/legacy/common/selector/SimpleTopicSelectorTest.java
+++ b/src/test/java/org/apache/rocketmq/flink/legacy/common/selector/SimpleTopicSelectorTest.java
@@ -1,17 +1,21 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.rocketmq.flink.legacy.common.selector;
 
 import org.junit.Test;
diff --git a/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchemaTest.java b/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchemaTest.java
new file mode 100644
index 0000000..2c1786a
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchemaTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.legacy.common.serialization;
+
+import org.apache.rocketmq.common.message.MessageExt;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test for {@link RowKeyValueDeserializationSchema}. */
+public class RowKeyValueDeserializationSchemaTest {
+
+    @Test
+    public void testDeserializeKeyAndValue() {
+        TableSchema tableSchema =
+                new TableSchema.Builder().field("varchar", DataTypes.VARCHAR(100)).build();
+        RowKeyValueDeserializationSchema deserializationSchema =
+                new RowKeyValueDeserializationSchema.Builder()
+                        .setTableSchema(tableSchema)
+                        .setProperties(new HashMap<>())
+                        .build();
+        MessageExt messageExt = new MessageExt();
+        messageExt.setBody("test_deserialize_key_and_value".getBytes());
+        RowData rowData = deserializationSchema.deserializeKeyAndValue(null, messageExt.getBody());
+        assertEquals(new String(messageExt.getBody()), rowData.getString(0).toString());
+    }
+}
diff --git a/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchemaTest.java b/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchemaTest.java
index 78baf20..7e2e0d9 100644
--- a/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchemaTest.java
+++ b/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchemaTest.java
@@ -1,17 +1,21 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.rocketmq.flink.legacy.common.serialization;
 
 import org.junit.Test;
diff --git a/src/test/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactoryTest.java b/src/test/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactoryTest.java
new file mode 100644
index 0000000..184a23f
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactoryTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.table;
+
+import org.apache.rocketmq.flink.source.common.RocketMQOptions;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link RocketMQDynamicTableSourceFactory}. */
+public class RocketMQDynamicTableSourceFactoryTest {
+
+    private static final ResolvedSchema SCHEMA =
+            new ResolvedSchema(
+                    Collections.singletonList(Column.physical("name", STRING().notNull())),
+                    new ArrayList<>(),
+                    null);
+
+    private static final String IDENTIFIER = "rocketmq";
+    private static final String TOPIC = "test_source";
+    private static final String CONSUMER_GROUP = "test_consumer";
+    private static final String NAME_SERVER_ADDRESS = "127.0.0.1:9876";
+
+    @Test
+    public void testRocketMQDynamicTableSourceWithLegalOption() {
+        final Map<String, String> options = new HashMap<>();
+        options.put("connector", IDENTIFIER);
+        options.put(RocketMQOptions.TOPIC.key(), TOPIC);
+        options.put(RocketMQOptions.CONSUMER_GROUP.key(), CONSUMER_GROUP);
+        options.put(RocketMQOptions.NAME_SERVER_ADDRESS.key(), NAME_SERVER_ADDRESS);
+        final DynamicTableSource tableSource = createTableSource(options);
+        assertTrue(tableSource instanceof RocketMQScanTableSource);
+        assertEquals(RocketMQScanTableSource.class.getName(), tableSource.asSummaryString());
+    }
+
+    @Test(expected = ValidationException.class)
+    public void testRocketMQDynamicTableSourceWithoutRequiredOption() {
+        final Map<String, String> options = new HashMap<>();
+        options.put("connector", IDENTIFIER);
+        options.put(RocketMQOptions.TOPIC.key(), TOPIC);
+        options.put(RocketMQOptions.CONSUMER_GROUP.key(), CONSUMER_GROUP);
+        options.put(RocketMQOptions.OPTIONAL_TAG.key(), "test_tag");
+        createTableSource(options);
+    }
+
+    @Test(expected = ValidationException.class)
+    public void testRocketMQDynamicTableSourceWithUnknownOption() {
+        final Map<String, String> options = new HashMap<>();
+        options.put(RocketMQOptions.TOPIC.key(), TOPIC);
+        options.put(RocketMQOptions.CONSUMER_GROUP.key(), CONSUMER_GROUP);
+        options.put(RocketMQOptions.NAME_SERVER_ADDRESS.key(), NAME_SERVER_ADDRESS);
+        options.put("unknown", "test_option");
+        createTableSource(options);
+    }
+
+    private static DynamicTableSource createTableSource(
+            Map<String, String> options, Configuration conf) {
+        return FactoryUtil.createTableSource(
+                null,
+                ObjectIdentifier.of("default", "default", IDENTIFIER),
+                new ResolvedCatalogTable(
+                        CatalogTable.of(
+                                Schema.newBuilder().fromResolvedSchema(SCHEMA).build(),
+                                "mock source",
+                                Collections.emptyList(),
+                                options),
+                        SCHEMA),
+                conf,
+                RocketMQDynamicTableSourceFactory.class.getClassLoader(),
+                false);
+    }
+
+    private static DynamicTableSource createTableSource(Map<String, String> options) {
+        return createTableSource(options, new Configuration());
+    }
+}