You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by es...@apache.org on 2022/07/17 12:30:42 UTC

[bahir-flink] branch master updated (80a57e9 -> 2853a74)

This is an automated email from the ASF dual-hosted git repository.

eskabetxe pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git


 discard 80a57e9  fix: reformat code style
 discard 7b2563e  Update AbstractKuduInputFormat.java
 discard 3ac6a21  Update RowDataUpsertOperationMapper.java
 discard a36999b  Update RowResultRowDataConvertor.java
 discard ebb9085  Update RowDataUpsertOperationMapper.java
 discard e6c0017  Update RowResultRowDataConvertor.java
 discard a0102df  fix: fix ExtractionUtils.extractionError,update lookup function
 discard 959df22  fix: reformat code style
 discard 987b0d8  fix: reformat code style
 discard 73a4fbd  fix: reformat code style
 discard 10b9c36  fix: update KuduDynamicSinkTest
 discard d28d71f  fix: update KuduDynamicSinkTest
 discard 06d8eb9  fix: kudu.master config -> kudu.masters
 discard 62600e6  feat: add KuduRowDataInputFormatTest
 discard 3e24c95  feat: add dynamicSourceSinkFactory SPI Class&add dynamicSink test
 discard 3dec25b  feat: add dynamic source/sink&lookup function&hash bucket nums config
     new 2853a74  [BAHIR-305] Kudu Flink SQL Support DynamicSource/Sink&LookupFunction

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (80a57e9)
            \
             N -- N -- N   refs/heads/master (2853a74)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:


[bahir-flink] 01/01: [BAHIR-305] Kudu Flink SQL Support DynamicSource/Sink&LookupFunction

Posted by es...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git

commit 2853a745bbcb13d7a6bade70174b13af26b73b0a
Author: Joao Boto <bo...@boto.pro>
AuthorDate: Sun Jul 17 14:30:01 2022 +0200

    [BAHIR-305] Kudu Flink SQL Support DynamicSource/Sink&LookupFunction
---
 flink-connector-kudu/README.md                     | 104 +++++----
 .../connector/convertor/RowResultConvertor.java    |  39 ++++
 .../connector/convertor/RowResultRowConvertor.java |  44 ++++
 .../convertor/RowResultRowDataConvertor.java       |  88 ++++++++
 .../kudu/connector/reader/KuduReader.java          |  49 +++--
 .../kudu/connector/reader/KuduReaderConfig.java    |   2 +-
 .../kudu/connector/reader/KuduReaderIterator.java  |  30 +--
 .../kudu/connector/writer/KuduWriterConfig.java    |   2 +-
 .../writer/RowDataUpsertOperationMapper.java       | 144 ++++++++++++
 .../AbstractKuduInputFormat.java}                  |  45 ++--
 .../kudu/{batch => format}/KuduOutputFormat.java   |   2 +-
 .../kudu/format/KuduRowDataInputFormat.java        |  52 +++++
 .../connectors/kudu/format/KuduRowInputFormat.java |  55 +++++
 .../flink/connectors/kudu/table/KuduCatalog.java   |   5 +
 .../connectors/kudu/table/KuduCatalogFactory.java  |  73 -------
 .../connectors/kudu/table/KuduTableFactory.java    |   1 -
 .../connectors/kudu/table/KuduTableSource.java     |  40 ++--
 .../kudu/table/dynamic/KuduDynamicTableSink.java   |  88 ++++++++
 .../kudu/table/dynamic/KuduDynamicTableSource.java | 173 +++++++++++++++
 .../dynamic/KuduDynamicTableSourceSinkFactory.java | 243 +++++++++++++++++++++
 .../table/dynamic/catalog/KuduCatalogFactory.java  |  71 ++++++
 .../catalog/KuduDynamicCatalog.java}               |  58 ++---
 .../table/function/lookup/KuduLookupOptions.java   |  81 +++++++
 .../function/lookup/KuduRowDataLookupFunction.java | 233 ++++++++++++++++++++
 .../kudu/table/utils/KuduTableUtils.java           |  68 +++---
 ...ry => org.apache.flink.table.factories.Factory} |   3 +-
 .../org.apache.flink.table.factories.TableFactory  |   2 +-
 .../connectors/kudu/connector/KuduTestBase.java    | 123 ++++++++---
 .../{batch => format}/KuduOutputFormatTest.java    |   2 +-
 .../KuduRowDataInputFormatTest.java}               |  49 ++++-
 .../KuduRowInputFormatTest.java}                   |  18 +-
 .../kudu/table/KuduTableSourceITCase.java          |   4 +-
 .../kudu/table/dynamic/KuduDynamicSinkTest.java    |  84 +++++++
 .../kudu/table/dynamic/KuduDynamicSourceTest.java  | 164 ++++++++++++++
 .../dynamic/KuduRowDataLookupFunctionTest.java     | 150 +++++++++++++
 .../writer/RowDataUpsertOperationMapperTest.java   |  57 +++++
 36 files changed, 2129 insertions(+), 317 deletions(-)

diff --git a/flink-connector-kudu/README.md b/flink-connector-kudu/README.md
index a0e0234..c52adb1 100644
--- a/flink-connector-kudu/README.md
+++ b/flink-connector-kudu/README.md
@@ -2,18 +2,17 @@
 
 This connector provides a source (```KuduInputFormat```), a sink/output
 (```KuduSink``` and ```KuduOutputFormat```, respectively),
- as well a table source (`KuduTableSource`), an upsert table sink (`KuduTableSink`), and a catalog (`KuduCatalog`),
- to allow reading and writing to [Kudu](https://kudu.apache.org/).
+as well a table source (`KuduTableSource`), an upsert table sink (`KuduTableSink`), and a catalog (`KuduCatalog`),
+to allow reading and writing to [Kudu](https://kudu.apache.org/).
 
 To use this connector, add the following dependency to your project:
 
-    <dependency>
-      <groupId>org.apache.bahir</groupId>
-      <artifactId>flink-connector-kudu_2.11</artifactId>
-      <version>1.1-SNAPSHOT</version>
-    </dependency>
-
- *Version Compatibility*: This module is compatible with Apache Kudu *1.11.1* (last stable version) and Apache Flink 1.10.+.
+<dependency>
+  <groupId>org.apache.bahir</groupId>
+  <artifactId>flink-connector-kudu_2.11</artifactId>
+  <version>1.1-SNAPSHOT</version>
+</dependency>
+*Version Compatibility*: This module is compatible with Apache Kudu *1.11.1* (last stable version) and Apache Flink 1.10.+.
 
 Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution.
 See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/projectsetup/dependencies.html).
@@ -45,7 +44,6 @@ catalogs:
     type: kudu
     kudu.masters: <host>:7051
 ```
-
 Once the SQL CLI is started you can simply switch to the Kudu catalog by calling `USE CATALOG kudu;`
 
 You can also create and use the KuduCatalog directly in the Table environment:
@@ -56,12 +54,12 @@ KuduCatalog catalog = new KuduCatalog(KUDU_MASTERS);
 tableEnv.registerCatalog("kudu", catalog);
 tableEnv.useCatalog("kudu");
 ```
-
 ### DDL operations using SQL
 
 It is possible to manipulate Kudu tables using SQL DDL.
 
 When not using the Kudu catalog, the following additional properties must be specified in the `WITH` clause:
+
 * `'connector.type'='kudu'`
 * `'kudu.masters'='host1:port1,host2:port2,...'`: comma-delimitered list of Kudu masters
 * `'kudu.table'='...'`: The table's name within the Kudu database.
@@ -89,8 +87,8 @@ CREATE TABLE TestTable (
   'kudu.primary-key-columns' = 'first,second'
 )
 ```
-
 Other catalogs
+
 ```
 CREATE TABLE TestTable (
   first STRING,
@@ -104,17 +102,16 @@ CREATE TABLE TestTable (
   'kudu.primary-key-columns' = 'first,second'
 )
 ```
-
 Renaming a table:
+
 ```
 ALTER TABLE TestTable RENAME TO TestTableRen
 ```
-
 Dropping a table:
+
 ```sql
 DROP TABLE TestTableRen
 ```
-
 #### Creating a KuduTable directly with KuduCatalog
 
 The KuduCatalog also exposes a simple `createTable` method that required only the where table configuration,
@@ -123,7 +120,7 @@ including schema, partitioning, replication, etc. can be specified using a `Kudu
 Use the `createTableIfNotExists` method, that takes a `ColumnSchemasFactory` and
 a `CreateTableOptionsFactory` parameter, that implement respectively `getColumnSchemas()`
 returning a list of Kudu [ColumnSchema](https://kudu.apache.org/apidocs/org/apache/kudu/ColumnSchema.html) objects;
- and  `getCreateTableOptions()` returning a
+and  `getCreateTableOptions()` returning a
 [CreateTableOptions](https://kudu.apache.org/apidocs/org/apache/kudu/client/CreateTableOptions.html) object.
 
 This example shows the creation of a table called `ExampleTable` with two columns,
@@ -155,32 +152,46 @@ Read more about Kudu schema design in the [Kudu docs](https://kudu.apache.org/do
 
 ### Supported data types
 
-| Flink/SQL            | Kudu                    |
-|----------------------|:-----------------------:|
-| `STRING`             | STRING                  |
-| `BOOLEAN`            | BOOL                    |
-| `TINYINT`            | INT8                    |
-| `SMALLINT`           | INT16                   |
-| `INT`                | INT32                   |
-| `BIGINT`             | INT64                   |
-| `FLOAT`              | FLOAT                   |
-| `DOUBLE`             | DOUBLE                  |
-| `BYTES`              | BINARY                  |
-| `TIMESTAMP(3)`       | UNIXTIME_MICROS         |
+
+| Flink/SQL      |      Kudu      |
+| ---------------- | :---------------: |
+| `STRING`       |     STRING     |
+| `BOOLEAN`      |      BOOL      |
+| `TINYINT`      |      INT8      |
+| `SMALLINT`     |      INT16      |
+| `INT`          |      INT32      |
+| `BIGINT`       |      INT64      |
+| `FLOAT`        |      FLOAT      |
+| `DOUBLE`       |     DOUBLE     |
+| `BYTES`        |     BINARY     |
+| `TIMESTAMP(3)` | UNIXTIME_MICROS |
 
 Note:
-* `TIMESTAMP`s are fixed to a precision of 3, and the corresponding Java conversion class is `java.sql.Timestamp` 
+
+* `TIMESTAMP`s are fixed to a precision of 3, and the corresponding Java conversion class is `java.sql.Timestamp`
 * `BINARY` and `VARBINARY` are not yet supported - use `BYTES`, which is a `VARBINARY(2147483647)`
-*  `CHAR` and `VARCHAR` are not yet supported - use `STRING`, which is a `VARCHAR(2147483647)`
+* `CHAR` and `VARCHAR` are not yet supported - use `STRING`, which is a `VARCHAR(2147483647)`
 * `DECIMAL` types are not yet supported
 
+### Lookup Cache
+
+Kudu connector can be used in temporal join as a lookup source (aka. dimension table). Currently, only sync lookup mode is supported.
+
+By default, lookup cache is not enabled. You can enable it by setting both `lookup.cache.max-rows` and `lookup.cache.ttl`.
+
+The lookup cache is used to improve performance of temporal join theKudu connector. By default, lookup cache is not enabled, so all the requests are sent to external database. When lookup cache is enabled, each process (i.e. TaskManager) will hold a cache. Flink will lookup the cache first, and only send requests to external database when cache missing, and update cache with the rows returned. The oldest rows in cache will be expired when the cache hit to the max cached rows `kudu.lookup [...]
+
+Reference :[Flink Jdbc Connector](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/jdbc/#lookup-cache)
+
+
 ### Known limitations
+
 * Data type limitations (see above).
 * SQL Create table: primary keys can only be set by the `kudu.primary-key-columns` property, using the
-`PRIMARY KEY` constraint is not yet possible.
+  `PRIMARY KEY` constraint is not yet possible.
 * SQL Create table: range partitioning is not supported.
 * When getting a table through the Catalog, NOT NULL and PRIMARY KEY constraints are ignored. All columns
-are described as being nullable, and not being primary keys.
+  are described as being nullable, and not being primary keys.
 * Kudu tables cannot be altered through the catalog other than simple renaming
 
 ## DataStream API
@@ -192,13 +203,15 @@ with Kudu data.
 ### Reading tables into a DataStreams
 
 There are 2 main ways of reading a Kudu Table into a DataStream
- 1. Using the `KuduCatalog` and the Table API
- 2. Using the `KuduRowInputFormat` directly
+
+1. Using the `KuduCatalog` and the Table API
+2. Using the `KuduRowInputFormat` directly
 
 Using the `KuduCatalog` and Table API is the recommended way of reading tables as it automatically
 guarantees type safety and takes care of configuration of our readers.
 
 This is how it works in practice:
+
 ```java
 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, tableSettings);
 
@@ -208,7 +221,6 @@ tableEnv.useCatalog("kudu");
 Table table = tableEnv.sqlQuery("SELECT * FROM MyKuduTable");
 DataStream<Row> rows = tableEnv.toAppendStream(table, Row.class);
 ```
-
 The second way of achieving the same thing is by using the `KuduRowInputFormat` directly.
 In this case we have to manually provide all information about our table:
 
@@ -219,18 +231,19 @@ KuduRowInputFormat inputFormat = new KuduRowInputFormat(readerConfig, tableInfo)
 
 DataStream<Row> rowStream = env.createInput(inputFormat, rowTypeInfo);
 ```
-
 At the end of the day the `KuduTableSource` is just a convenient wrapper around the `KuduRowInputFormat`.
 
 ### Kudu Sink
+
 The connector provides a `KuduSink` class that can be used to consume DataStreams
 and write the results into a Kudu table.
 
 The constructor takes 3 or 4 arguments.
- * `KuduWriterConfig` is used to specify the Kudu masters and the flush mode.
- * `KuduTableInfo` identifies the table to be written
- * `KuduOperationMapper` maps the records coming from the DataStream to a list of Kudu operations.
- * `KuduFailureHandler` (optional): If you want to provide your own logic for handling writing failures.
+
+* `KuduWriterConfig` is used to specify the Kudu masters and the flush mode.
+* `KuduTableInfo` identifies the table to be written
+* `KuduOperationMapper` maps the records coming from the DataStream to a list of Kudu operations.
+* `KuduFailureHandler` (optional): If you want to provide your own logic for handling writing failures.
 
 The example below shows the creation of a sink for Row type records of 3 fields. It Upserts each record.
 It is assumed that a Kudu table with columns `col1, col2, col3` called `AlreadyExistingTable` exists. Note that if this were not the case,
@@ -248,7 +261,6 @@ KuduSink<Row> sink = new KuduSink<>(
             AbstractSingleOperationMapper.KuduOperation.UPSERT)
 )
 ```
-
 #### KuduOperationMapper
 
 This section describes the Operation mapping logic in more detail.
@@ -272,12 +284,13 @@ It is also possible to implement your own logic by overriding the
 `createBaseOperation` method that returns a Kudu [Operation](https://kudu.apache.org/apidocs/org/apache/kudu/client/Operation.html).
 
 There are pre-defined operation mappers for Pojo, Flink Row, and Flink Tuple types for constant operation, 1-to-1 sinks.
+
 * `PojoOperationMapper`: Each table column must correspond to a POJO field
-with the same name. The  `columnNames` array should contain those fields of the POJO that
-are present as table columns (the POJO fields can be a superset of table columns).
+  with the same name. The  `columnNames` array should contain those fields of the POJO that
+  are present as table columns (the POJO fields can be a superset of table columns).
 * `RowOperationMapper` and `TupleOperationMapper`: the mapping is based on position. The
-`i`th field of the Row/Tuple corresponds to the column of the table at the `i`th
-position in the `columnNames` array.
+  `i`th field of the Row/Tuple corresponds to the column of the table at the `i`th
+  position in the `columnNames` array.
 
 ## Building the connector
 
@@ -287,7 +300,6 @@ The connector can be easily built by using maven:
 cd bahir-flink
 mvn clean install
 ```
-
 ### Running the tests
 
 The integration tests rely on the Kudu test harness which requires the current user to be able to ssh to localhost.
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/convertor/RowResultConvertor.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/convertor/RowResultConvertor.java
new file mode 100644
index 0000000..2cbb97e
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/convertor/RowResultConvertor.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.connector.convertor;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.kudu.client.RowResult;
+
+import java.io.Serializable;
+
+/**
+ * The RowResult object of Kudu is converted to correspond to the Flink internal row object
+ *
+ * @param <T> Kudu rowResult Convertor format
+ */
+@Internal
+public interface RowResultConvertor<T> extends Serializable {
+
+    /**
+     * Convert Kudu RowResult to the corresponding format
+     *
+     * @param row Kudu RowResult Type
+     * @return {@link T}
+     */
+    T convertor(RowResult row);
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/convertor/RowResultRowConvertor.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/convertor/RowResultRowConvertor.java
new file mode 100644
index 0000000..b6f4eee
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/convertor/RowResultRowConvertor.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.connector.convertor;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.types.Row;
+import org.apache.kudu.Schema;
+import org.apache.kudu.client.RowResult;
+
+/**
+ * Transform the Kudu RowResult object into a Flink Row object
+ */
+@Internal
+public class RowResultRowConvertor implements RowResultConvertor<Row> {
+    @Override
+    public Row convertor(RowResult row) {
+        Schema schema = row.getColumnProjection();
+
+        Row values = new Row(schema.getColumnCount());
+        schema.getColumns().forEach(column -> {
+            String name = column.getName();
+            int pos = schema.getColumnIndex(name);
+            if (row.isNull(name)) {
+                return;
+            }
+            values.setField(pos, row.getObject(name));
+        });
+        return values;
+    }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/convertor/RowResultRowDataConvertor.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/convertor/RowResultRowDataConvertor.java
new file mode 100644
index 0000000..b7dc702
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/convertor/RowResultRowDataConvertor.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.kudu.connector.convertor;
+
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.RowResult;
+
+import java.math.BigDecimal;
+import java.util.Objects;
+
+/**
+ * Transform the Kudu RowResult object into a Flink RowData object
+ */
+public class RowResultRowDataConvertor implements RowResultConvertor<RowData> {
+    @Override
+    public RowData convertor(RowResult row) {
+        Schema schema = row.getColumnProjection();
+        GenericRowData values = new GenericRowData(schema.getColumnCount());
+        schema.getColumns().forEach(column -> {
+            String name = column.getName();
+            Type type = column.getType();
+            int pos = schema.getColumnIndex(name);
+            if (Objects.isNull(type)) {
+                throw new IllegalArgumentException("columnName:" + name);
+            }
+            if (row.isNull(name)){
+                return;
+            }
+            switch (type) {
+                case DECIMAL:
+                    BigDecimal decimal = row.getDecimal(name);
+                    values.setField(pos, DecimalData.fromBigDecimal(decimal, decimal.precision(), decimal.scale()));
+                    break;
+                case UNIXTIME_MICROS:
+                    values.setField(pos, TimestampData.fromTimestamp(row.getTimestamp(name)));
+                    break;
+                case DOUBLE:
+                    values.setField(pos, row.getDouble(name));
+                    break;
+                case STRING:
+                    Object value = row.getObject(name);
+                    values.setField(pos, StringData.fromString(Objects.nonNull(value) ? value.toString() : ""));
+                    break;
+                case BINARY:
+                    values.setField(pos, row.getBinary(name));
+                    break;
+                case FLOAT:
+                    values.setField(pos, row.getFloat(name));
+                    break;
+                case INT64:
+                    values.setField(pos, row.getLong(name));
+                    break;
+                case INT32:
+                case INT16:
+                case INT8:
+                    values.setField(pos, row.getInt(name));
+                    break;
+                case BOOL:
+                    values.setField(pos, row.getBoolean(name));
+                    break;
+                default:
+                    throw new IllegalArgumentException("columnName:" + name + ",type:" + type.getName() + "not support!");
+            }
+        });
+        return values;
+    }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java
index d7a0c61..6816fc3 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java
@@ -21,7 +21,9 @@ import org.apache.flink.connectors.kudu.connector.KuduFilterInfo;
 import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.connectors.kudu.connector.convertor.RowResultConvertor;
 import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
 import org.apache.kudu.client.KuduScanToken;
 import org.apache.kudu.client.KuduSession;
 import org.apache.kudu.client.KuduTable;
@@ -34,40 +36,50 @@ import java.util.ArrayList;
 import java.util.List;
 
 @Internal
-public class KuduReader implements AutoCloseable {
+public class KuduReader<T> implements AutoCloseable {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     private final KuduTableInfo tableInfo;
     private final KuduReaderConfig readerConfig;
-    private final List<KuduFilterInfo> tableFilters;
-    private final List<String> tableProjections;
+    private List<KuduFilterInfo> tableFilters;
+    private List<String> tableProjections;
+    private final RowResultConvertor<T> rowResultConvertor;
 
-    private transient KuduClient client;
-    private transient KuduSession session;
-    private transient KuduTable table;
+    private final transient KuduClient client;
+    private final transient KuduSession session;
+    private final transient KuduTable table;
 
-    public KuduReader(KuduTableInfo tableInfo, KuduReaderConfig readerConfig) throws IOException {
-        this(tableInfo, readerConfig, new ArrayList<>(), null);
+    public KuduReader(KuduTableInfo tableInfo, KuduReaderConfig readerConfig, RowResultConvertor<T> rowResultConvertor) throws IOException {
+        this(tableInfo, readerConfig, rowResultConvertor, new ArrayList<>(), null);
     }
 
-    public KuduReader(KuduTableInfo tableInfo, KuduReaderConfig readerConfig, List<KuduFilterInfo> tableFilters) throws IOException {
-        this(tableInfo, readerConfig, tableFilters, null);
+    public KuduReader(KuduTableInfo tableInfo, KuduReaderConfig readerConfig, RowResultConvertor<T> rowResultConvertor, List<KuduFilterInfo> tableFilters) throws IOException {
+        this(tableInfo, readerConfig, rowResultConvertor, tableFilters, null);
     }
 
-    public KuduReader(KuduTableInfo tableInfo, KuduReaderConfig readerConfig, List<KuduFilterInfo> tableFilters, List<String> tableProjections) throws IOException {
+    public KuduReader(KuduTableInfo tableInfo, KuduReaderConfig readerConfig, RowResultConvertor<T> rowResultConvertor, List<KuduFilterInfo> tableFilters, List<String> tableProjections) throws IOException {
         this.tableInfo = tableInfo;
         this.readerConfig = readerConfig;
         this.tableFilters = tableFilters;
         this.tableProjections = tableProjections;
-
+        this.rowResultConvertor = rowResultConvertor;
         this.client = obtainClient();
         this.session = obtainSession();
         this.table = obtainTable();
     }
 
+    public void setTableFilters(List<KuduFilterInfo> tableFilters) {
+        this.tableFilters = tableFilters;
+    }
+
+    public void setTableProjections(List<String> tableProjections) {
+        this.tableProjections = tableProjections;
+    }
+
     private KuduClient obtainClient() {
-        return new KuduClient.KuduClientBuilder(readerConfig.getMasters()).build();
+        return new KuduClient.KuduClientBuilder(readerConfig.getMasters())
+                .build();
     }
 
     private KuduSession obtainSession() {
@@ -85,8 +97,8 @@ public class KuduReader implements AutoCloseable {
         throw new RuntimeException("Table " + tableName + " does not exist.");
     }
 
-    public KuduReaderIterator scanner(byte[] token) throws IOException {
-        return new KuduReaderIterator(KuduScanToken.deserializeIntoScanner(token, client));
+    public KuduReaderIterator<T> scanner(byte[] token) throws IOException {
+        return new KuduReaderIterator<>(KuduScanToken.deserializeIntoScanner(token, client), rowResultConvertor);
     }
 
     public List<KuduScanToken> scanTokens(List<KuduFilterInfo> tableFilters, List<String> tableProjections, Integer rowLimit) {
@@ -152,9 +164,7 @@ public class KuduReader implements AutoCloseable {
      * @return Formatted URL
      */
     private String getLocation(String host, Integer port) {
-        StringBuilder builder = new StringBuilder();
-        builder.append(host).append(":").append(port);
-        return builder.toString();
+        return host + ":" + port;
     }
 
     @Override
@@ -163,7 +173,7 @@ public class KuduReader implements AutoCloseable {
             if (session != null) {
                 session.close();
             }
-        } catch (Exception e) {
+        } catch (KuduException e) {
             log.error("Error while closing session.", e);
         }
         try {
@@ -175,3 +185,4 @@ public class KuduReader implements AutoCloseable {
         }
     }
 }
+
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderConfig.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderConfig.java
index 96fde1b..468cb1e 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderConfig.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderConfig.java
@@ -25,7 +25,7 @@ import java.io.Serializable;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * Configuration used by {@link org.apache.flink.connectors.kudu.batch.KuduRowInputFormat}. Specifies connection and other necessary properties.
+ * Configuration used by {@link org.apache.flink.connectors.kudu.format.KuduRowInputFormat}. Specifies connection and other necessary properties.
  */
 @PublicEvolving
 public class KuduReaderConfig implements Serializable {
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderIterator.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderIterator.java
index 1ea4690..543e41e 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderIterator.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderIterator.java
@@ -17,22 +17,24 @@
 package org.apache.flink.connectors.kudu.connector.reader;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.types.Row;
-
-import org.apache.kudu.Schema;
+import org.apache.flink.connectors.kudu.connector.convertor.RowResultConvertor;
 import org.apache.kudu.client.KuduException;
 import org.apache.kudu.client.KuduScanner;
 import org.apache.kudu.client.RowResult;
 import org.apache.kudu.client.RowResultIterator;
 
+import java.io.Serializable;
+
 @Internal
-public class KuduReaderIterator {
+public class KuduReaderIterator<T> implements Serializable {
 
-    private KuduScanner scanner;
+    private final KuduScanner scanner;
+    private final RowResultConvertor<T> rowResultConvertor;
     private RowResultIterator rowIterator;
 
-    public KuduReaderIterator(KuduScanner scanner) throws KuduException {
+    public KuduReaderIterator(KuduScanner scanner, RowResultConvertor<T> rowResultConvertor) throws KuduException {
         this.scanner = scanner;
+        this.rowResultConvertor = rowResultConvertor;
         nextRows();
     }
 
@@ -51,24 +53,12 @@ public class KuduReaderIterator {
         }
     }
 
-    public Row next() {
+    public T next() {
         RowResult row = this.rowIterator.next();
-        return toFlinkRow(row);
+        return rowResultConvertor.convertor(row);
     }
 
     private void nextRows() throws KuduException {
         this.rowIterator = scanner.nextRows();
     }
-
-    private Row toFlinkRow(RowResult row) {
-        Schema schema = row.getColumnProjection();
-
-        Row values = new Row(schema.getColumnCount());
-        schema.getColumns().forEach(column -> {
-            String name = column.getName();
-            int pos = schema.getColumnIndex(name);
-            values.setField(pos, row.getObject(name));
-        });
-        return values;
-    }
 }
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java
index ff93921..6c6d216 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java
@@ -28,7 +28,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.kudu.client.SessionConfiguration.FlushMode;
 
 /**
- * Configuration used by {@link org.apache.flink.connectors.kudu.streaming.KuduSink} and {@link org.apache.flink.connectors.kudu.batch.KuduOutputFormat}.
+ * Configuration used by {@link org.apache.flink.connectors.kudu.streaming.KuduSink} and {@link org.apache.flink.connectors.kudu.format.KuduOutputFormat}.
  * Specifies connection and other necessary properties.
  */
 @PublicEvolving
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/RowDataUpsertOperationMapper.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/RowDataUpsertOperationMapper.java
new file mode 100644
index 0000000..d4f0182
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/RowDataUpsertOperationMapper.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.connector.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Optional;
+
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+
+@Internal
+public class RowDataUpsertOperationMapper extends AbstractSingleOperationMapper<RowData> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RowDataUpsertOperationMapper.class);
+
+
+    private static final int MIN_TIME_PRECISION = 0;
+    private static final int MAX_TIME_PRECISION = 3;
+    private static final int MIN_TIMESTAMP_PRECISION = 0;
+    private static final int MAX_TIMESTAMP_PRECISION = 6;
+
+    private LogicalType[] logicalTypes;
+
+    public RowDataUpsertOperationMapper(TableSchema schema) {
+        super(schema.getFieldNames());
+        logicalTypes = Arrays.stream(schema.getFieldDataTypes())
+                .map(DataType::getLogicalType)
+                .toArray(LogicalType[]::new);
+    }
+
+    @Override
+    public Object getField(RowData input, int i) {
+        return getFieldValue(input, i);
+    }
+
+    public Object getFieldValue(RowData input, int i) {
+        if (input == null || input.isNullAt(i)) {
+            return null;
+        }
+        LogicalType fieldType = logicalTypes[i];
+        switch (fieldType.getTypeRoot()) {
+            case CHAR:
+            case VARCHAR: {
+                StringData data = input.getString(i);
+                if (data != null) {
+                    return data.toString();
+                }
+                return null;
+            }
+            case BOOLEAN:
+                return input.getBoolean(i);
+            case BINARY:
+            case VARBINARY:
+                return input.getBinary(i);
+            case DECIMAL: {
+                DecimalType decimalType = (DecimalType) fieldType;
+                final int precision = decimalType.getPrecision();
+                final int scale = decimalType.getScale();
+                DecimalData data = input.getDecimal(i, precision, scale);
+                if (data != null) {
+                    return data.toBigDecimal();
+                } else {
+                    return null;
+                }
+            }
+            case TINYINT:
+                return input.getByte(i);
+            case SMALLINT:
+                return input.getShort(i);
+            case INTEGER:
+            case DATE:
+            case INTERVAL_YEAR_MONTH:
+                return input.getInt(i);
+            case TIME_WITHOUT_TIME_ZONE:
+                final int timePrecision = getPrecision(fieldType);
+                if (timePrecision < MIN_TIME_PRECISION || timePrecision > MAX_TIME_PRECISION) {
+                    throw new UnsupportedOperationException(
+                            String.format("The precision %s of TIME type is out of the range [%s, %s] supported by " +
+                                    "kudu connector", timePrecision, MIN_TIME_PRECISION, MAX_TIME_PRECISION));
+                }
+                return input.getInt(i);
+            case BIGINT:
+            case INTERVAL_DAY_TIME:
+                return input.getLong(i);
+            case FLOAT:
+                return input.getFloat(i);
+            case DOUBLE:
+                return input.getDouble(i);
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                final int timestampPrecision = getPrecision(fieldType);
+                if (timestampPrecision < MIN_TIMESTAMP_PRECISION || timestampPrecision > MAX_TIMESTAMP_PRECISION) {
+                    throw new UnsupportedOperationException(
+                            String.format("The precision %s of TIMESTAMP type is out of the range [%s, %s] supported " +
+                                            "by kudu connector", timestampPrecision, MIN_TIMESTAMP_PRECISION,
+                                    MAX_TIMESTAMP_PRECISION));
+                }
+                return input.getTimestamp(i, timestampPrecision).toTimestamp();
+            default:
+                throw new UnsupportedOperationException("Unsupported type: " + fieldType);
+        }
+    }
+
+    @Override
+    public Optional<Operation> createBaseOperation(RowData input, KuduTable table) {
+        Optional<Operation> operation = Optional.empty();
+        switch (input.getRowKind()) {
+            case INSERT:
+            case UPDATE_AFTER:
+                operation = Optional.of(table.newUpsert());
+                break;
+            case DELETE:
+                operation = Optional.of(table.newDelete());
+                break;
+        }
+        return operation;
+    }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduRowInputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/AbstractKuduInputFormat.java
similarity index 70%
rename from flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduRowInputFormat.java
rename to flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/AbstractKuduInputFormat.java
index dc17ed4..0cdf570 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduRowInputFormat.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/AbstractKuduInputFormat.java
@@ -14,22 +14,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.connectors.kudu.batch;
+package org.apache.flink.connectors.kudu.format;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
 import org.apache.flink.api.common.io.RichInputFormat;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connectors.kudu.connector.KuduFilterInfo;
 import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.convertor.RowResultConvertor;
 import org.apache.flink.connectors.kudu.connector.reader.KuduInputSplit;
 import org.apache.flink.connectors.kudu.connector.reader.KuduReader;
 import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
 import org.apache.flink.connectors.kudu.connector.reader.KuduReaderIterator;
+import org.apache.flink.connectors.kudu.table.KuduCatalog;
 import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.types.Row;
-
 import org.apache.kudu.client.KuduException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,39 +43,42 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Input format for reading the contents of a Kudu table (defined by the provided {@link KuduTableInfo}) in both batch
- * and stream programs. Rows of the Kudu table are mapped to {@link Row} instances that can converted to other data
+ * and stream programs. Rows of the Kudu table are mapped to {@link T} instances that can converted to other data
  * types by the user later if necessary.
  *
- * <p> For programmatic access to the schema of the input rows users can use the {@link org.apache.flink.connectors.kudu.table.KuduCatalog}
+ * <p> For programmatic access to the schema of the input rows users can use the {@link KuduCatalog}
  * or overwrite the column order manually by providing a list of projected column names.
  */
 @PublicEvolving
-public class KuduRowInputFormat extends RichInputFormat<Row, KuduInputSplit> {
+public abstract class AbstractKuduInputFormat<T> extends RichInputFormat<T, KuduInputSplit> implements ResultTypeQueryable<T> {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     private final KuduReaderConfig readerConfig;
     private final KuduTableInfo tableInfo;
-
-    private List<KuduFilterInfo> tableFilters;
-    private List<String> tableProjections;
-
+    private final List<KuduFilterInfo> tableFilters;
+    private final List<String> tableProjections;
+    private final RowResultConvertor<T> rowResultConvertor;
     private boolean endReached;
+    private transient KuduReader<T> kuduReader;
+    private transient KuduReaderIterator<T> resultIterator;
 
-    private transient KuduReader kuduReader;
-    private transient KuduReaderIterator resultIterator;
-
-    public KuduRowInputFormat(KuduReaderConfig readerConfig, KuduTableInfo tableInfo) {
-        this(readerConfig, tableInfo, new ArrayList<>(), null);
+    public AbstractKuduInputFormat(KuduReaderConfig readerConfig, RowResultConvertor<T> rowResultConvertor,
+                                   KuduTableInfo tableInfo) {
+        this(readerConfig, rowResultConvertor, tableInfo, new ArrayList<>(), null);
     }
 
-    public KuduRowInputFormat(KuduReaderConfig readerConfig, KuduTableInfo tableInfo, List<String> tableProjections) {
-        this(readerConfig, tableInfo, new ArrayList<>(), tableProjections);
+    public AbstractKuduInputFormat(KuduReaderConfig readerConfig, RowResultConvertor<T> rowResultConvertor,
+                                   KuduTableInfo tableInfo, List<String> tableProjections) {
+        this(readerConfig, rowResultConvertor, tableInfo, new ArrayList<>(), tableProjections);
     }
 
-    public KuduRowInputFormat(KuduReaderConfig readerConfig, KuduTableInfo tableInfo, List<KuduFilterInfo> tableFilters, List<String> tableProjections) {
+    public AbstractKuduInputFormat(KuduReaderConfig readerConfig, RowResultConvertor<T> rowResultConvertor,
+                                   KuduTableInfo tableInfo, List<KuduFilterInfo> tableFilters,
+                                   List<String> tableProjections) {
 
         this.readerConfig = checkNotNull(readerConfig, "readerConfig could not be null");
+        this.rowResultConvertor = checkNotNull(rowResultConvertor, "rowResultConvertor could not be null");
         this.tableInfo = checkNotNull(tableInfo, "tableInfo could not be null");
         this.tableFilters = checkNotNull(tableFilters, "tableFilters could not be null");
         this.tableProjections = tableProjections;
@@ -96,7 +100,7 @@ public class KuduRowInputFormat extends RichInputFormat<Row, KuduInputSplit> {
 
     private void startKuduReader() throws IOException {
         if (kuduReader == null) {
-            kuduReader = new KuduReader(tableInfo, readerConfig, tableFilters, tableProjections);
+            kuduReader = new KuduReader<>(tableInfo, readerConfig, rowResultConvertor, tableFilters, tableProjections);
         }
     }
 
@@ -137,7 +141,7 @@ public class KuduRowInputFormat extends RichInputFormat<Row, KuduInputSplit> {
     }
 
     @Override
-    public Row nextRecord(Row reuse) throws IOException {
+    public T nextRecord(T reuse) throws IOException {
         // check that current iterator has next rows
         if (this.resultIterator.hasNext()) {
             return resultIterator.next();
@@ -146,5 +150,4 @@ public class KuduRowInputFormat extends RichInputFormat<Row, KuduInputSplit> {
             return null;
         }
     }
-
 }
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/KuduOutputFormat.java
similarity index 98%
rename from flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormat.java
rename to flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/KuduOutputFormat.java
index 4bf81fe..900515d 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormat.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/KuduOutputFormat.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.connectors.kudu.batch;
+package org.apache.flink.connectors.kudu.format;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.io.RichOutputFormat;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/KuduRowDataInputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/KuduRowDataInputFormat.java
new file mode 100644
index 0000000..bb75f61
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/KuduRowDataInputFormat.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.kudu.format;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connectors.kudu.connector.KuduFilterInfo;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.convertor.RowResultConvertor;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
+import org.apache.flink.table.data.RowData;
+
+import java.util.List;
+
+/**
+ * InputFormat based on the rowData object type
+ */
+@PublicEvolving
+public class KuduRowDataInputFormat extends AbstractKuduInputFormat<RowData> {
+
+    public KuduRowDataInputFormat(KuduReaderConfig readerConfig, RowResultConvertor<RowData> rowResultConvertor, KuduTableInfo tableInfo) {
+        super(readerConfig, rowResultConvertor, tableInfo);
+    }
+
+    public KuduRowDataInputFormat(KuduReaderConfig readerConfig, RowResultConvertor<RowData> rowResultConvertor, KuduTableInfo tableInfo, List<String> tableProjections) {
+        super(readerConfig, rowResultConvertor, tableInfo, tableProjections);
+    }
+
+    public KuduRowDataInputFormat(KuduReaderConfig readerConfig, RowResultConvertor<RowData> rowResultConvertor, KuduTableInfo tableInfo, List<KuduFilterInfo> tableFilters, List<String> tableProjections) {
+        super(readerConfig, rowResultConvertor, tableInfo, tableFilters, tableProjections);
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return TypeInformation.of(RowData.class);
+    }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/KuduRowInputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/KuduRowInputFormat.java
new file mode 100644
index 0000000..935de8f
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/KuduRowInputFormat.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.format;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connectors.kudu.connector.KuduFilterInfo;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.convertor.RowResultConvertor;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+
+/**
+ * InputFormat based on the row object type
+ */
+@PublicEvolving
+public class KuduRowInputFormat extends AbstractKuduInputFormat<Row> {
+
+    public KuduRowInputFormat(KuduReaderConfig readerConfig, RowResultConvertor<Row> rowResultConvertor,
+                              KuduTableInfo tableInfo) {
+        super(readerConfig, rowResultConvertor, tableInfo);
+    }
+
+    public KuduRowInputFormat(KuduReaderConfig readerConfig, RowResultConvertor<Row> rowResultConvertor,
+                              KuduTableInfo tableInfo, List<String> tableProjections) {
+        super(readerConfig, rowResultConvertor, tableInfo, tableProjections);
+    }
+
+    public KuduRowInputFormat(KuduReaderConfig readerConfig, RowResultConvertor<Row> rowResultConvertor,
+                              KuduTableInfo tableInfo, List<KuduFilterInfo> tableFilters,
+                              List<String> tableProjections) {
+        super(readerConfig, rowResultConvertor, tableInfo, tableFilters, tableProjections);
+    }
+
+    @Override
+    public TypeInformation<Row> getProducedType() {
+        return TypeInformation.of(Row.class);
+    }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java
index d8343e8..734e219 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java
@@ -71,8 +71,13 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Catalog for reading and creating Kudu tables.
+ * @deprecated After this class based on {@link KuduTableFactory},
+ *      but flink upgrade {@link org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory}
+ *      {@link KuduCatalog} underlying the use of TableFactory also needs to
+ *      update,So this class is replaced by the {@link org.apache.flink.connectors.kudu.table.dynamic.catalog.KuduDynamicCatalog} class
  */
 @PublicEvolving
+@Deprecated
 public class KuduCatalog extends AbstractReadOnlyCatalog {
 
     private static final Logger LOG = LoggerFactory.getLogger(KuduCatalog.class);
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalogFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalogFactory.java
deleted file mode 100644
index 2458a56..0000000
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalogFactory.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connectors.kudu.table;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.catalog.Catalog;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.factories.CatalogFactory;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Factory for {@link KuduCatalog}.
- */
-@Internal
-public class KuduCatalogFactory implements CatalogFactory {
-
-    private static final Logger LOG = LoggerFactory.getLogger(KuduCatalogFactory.class);
-
-    @Override
-    public Map<String, String> requiredContext() {
-        Map<String, String> context = new HashMap<>();
-        context.put("type", KuduTableFactory.KUDU);
-        context.put("property-version", "1"); // backwards compatibility
-        return context;
-    }
-
-    @Override
-    public List<String> supportedProperties() {
-        List<String> properties = new ArrayList<>();
-
-        properties.add(KuduTableFactory.KUDU_MASTERS);
-
-        return properties;
-    }
-
-    @Override
-    public Catalog createCatalog(String name, Map<String, String> properties) {
-        final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
-        return new KuduCatalog(name,
-            descriptorProperties.getString(KuduTableFactory.KUDU_MASTERS));
-    }
-
-    private DescriptorProperties getValidatedProperties(Map<String, String> properties) {
-        final DescriptorProperties descriptorProperties = new DescriptorProperties(true);
-        descriptorProperties.putProperties(properties);
-        descriptorProperties.validateString(KuduTableFactory.KUDU_MASTERS, false);
-        return descriptorProperties;
-    }
-
-}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
index a2883af..9112b0a 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flink.connectors.kudu.table;
 
 import org.apache.flink.api.java.tuple.Tuple2;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
index fea7e73..ad98e86 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
@@ -18,10 +18,12 @@
 package org.apache.flink.connectors.kudu.table;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.connectors.kudu.batch.KuduRowInputFormat;
 import org.apache.flink.connectors.kudu.connector.KuduFilterInfo;
 import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.convertor.RowResultRowConvertor;
 import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
+import org.apache.flink.connectors.kudu.format.KuduRowInputFormat;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.DataTypes;
@@ -36,8 +38,6 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.flink.types.Row;
-
-import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,7 +52,8 @@ import java.util.Optional;
 import static org.apache.flink.connectors.kudu.table.utils.KuduTableUtils.toKuduFilterInfo;
 
 public class KuduTableSource implements StreamTableSource<Row>,
-    LimitableTableSource<Row>, ProjectableTableSource<Row>, FilterableTableSource<Row> {
+        LimitableTableSource<Row>, ProjectableTableSource<Row>, FilterableTableSource<Row> {
+
 
     private static final Logger LOG = LoggerFactory.getLogger(KuduTableSource.class);
 
@@ -68,7 +69,7 @@ public class KuduTableSource implements StreamTableSource<Row>,
     private KuduRowInputFormat kuduRowInputFormat;
 
     public KuduTableSource(KuduReaderConfig.Builder configBuilder, KuduTableInfo tableInfo,
-        TableSchema flinkSchema, List<KuduFilterInfo> predicates, String[] projectedFields) {
+                           TableSchema flinkSchema, List<KuduFilterInfo> predicates, String[] projectedFields) {
         this.configBuilder = configBuilder;
         this.tableInfo = tableInfo;
         this.flinkSchema = flinkSchema;
@@ -77,9 +78,9 @@ public class KuduTableSource implements StreamTableSource<Row>,
         if (predicates != null && predicates.size() != 0) {
             this.isFilterPushedDown = true;
         }
-        this.kuduRowInputFormat = new KuduRowInputFormat(configBuilder.build(), tableInfo,
-            predicates == null ? Collections.emptyList() : predicates,
-            projectedFields == null ? null : Lists.newArrayList(projectedFields));
+        this.kuduRowInputFormat = new KuduRowInputFormat(configBuilder.build(), new RowResultRowConvertor(), tableInfo,
+                predicates == null ? Collections.emptyList() : predicates,
+                projectedFields == null ? null : Lists.newArrayList(projectedFields));
     }
 
     @Override
@@ -89,12 +90,13 @@ public class KuduTableSource implements StreamTableSource<Row>,
 
     @Override
     public DataStream<Row> getDataStream(StreamExecutionEnvironment env) {
-        KuduRowInputFormat inputFormat = new KuduRowInputFormat(configBuilder.build(), tableInfo,
-            predicates == null ? Collections.emptyList() : predicates,
-            projectedFields == null ? null : Lists.newArrayList(projectedFields));
+        KuduRowInputFormat inputFormat = new KuduRowInputFormat(configBuilder.build(), new RowResultRowConvertor(),
+                tableInfo,
+                predicates == null ? Collections.emptyList() : predicates,
+                projectedFields == null ? null : Lists.newArrayList(projectedFields));
         return env.createInput(inputFormat,
-            (TypeInformation<Row>) TypeConversions.fromDataTypeToLegacyInfo(getProducedDataType()))
-            .name(explainSource());
+                        (TypeInformation<Row>) TypeConversions.fromDataTypeToLegacyInfo(getProducedDataType()))
+                .name(explainSource());
     }
 
     @Override
@@ -135,7 +137,7 @@ public class KuduTableSource implements StreamTableSource<Row>,
     @Override
     public TableSource<Row> applyLimit(long l) {
         return new KuduTableSource(configBuilder.setRowLimit((int) l), tableInfo, flinkSchema,
-            predicates, projectedFields);
+                predicates, projectedFields);
     }
 
     @Override
@@ -153,17 +155,17 @@ public class KuduTableSource implements StreamTableSource<Row>,
     public TableSource<Row> applyPredicate(List<Expression> predicates) {
         List<KuduFilterInfo> kuduPredicates = new ArrayList<>();
         ListIterator<Expression> predicatesIter = predicates.listIterator();
-        while(predicatesIter.hasNext()) {
+        while (predicatesIter.hasNext()) {
             Expression predicate = predicatesIter.next();
             Optional<KuduFilterInfo> kuduPred = toKuduFilterInfo(predicate);
             if (kuduPred != null && kuduPred.isPresent()) {
                 LOG.debug("Predicate [{}] converted into KuduFilterInfo and pushed into " +
-                    "KuduTable [{}].", predicate, tableInfo.getName());
+                        "KuduTable [{}].", predicate, tableInfo.getName());
                 kuduPredicates.add(kuduPred.get());
                 predicatesIter.remove();
             } else {
                 LOG.debug("Predicate [{}] could not be pushed into KuduFilterInfo for KuduTable [{}].",
-                    predicate, tableInfo.getName());
+                        predicate, tableInfo.getName());
             }
         }
         return new KuduTableSource(configBuilder, tableInfo, flinkSchema, kuduPredicates, projectedFields);
@@ -172,8 +174,8 @@ public class KuduTableSource implements StreamTableSource<Row>,
     @Override
     public String explainSource() {
         return "KuduTableSource[schema=" + Arrays.toString(getTableSchema().getFieldNames()) +
-            ", filter=" + predicateString() +
-            (projectedFields != null ?", projectFields=" + Arrays.toString(projectedFields) + "]" : "]");
+                ", filter=" + predicateString() +
+                (projectedFields != null ? ", projectFields=" + Arrays.toString(projectedFields) + "]" : "]");
     }
 
     private String predicateString() {
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSink.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSink.java
new file mode 100644
index 0000000..179c01f
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSink.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.table.dynamic;
+
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
+import org.apache.flink.connectors.kudu.connector.writer.RowDataUpsertOperationMapper;
+import org.apache.flink.connectors.kudu.streaming.KuduSink;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Objects;
+
+/**
+ * A {@link KuduDynamicTableSink} for Kudu.
+ */
+public class KuduDynamicTableSink implements DynamicTableSink {
+    private final KuduWriterConfig.Builder writerConfigBuilder;
+    private final TableSchema flinkSchema;
+    private final KuduTableInfo tableInfo;
+
+    public KuduDynamicTableSink(KuduWriterConfig.Builder writerConfigBuilder, TableSchema flinkSchema,
+                                KuduTableInfo tableInfo) {
+        this.writerConfigBuilder = writerConfigBuilder;
+        this.flinkSchema = flinkSchema;
+        this.tableInfo = tableInfo;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+        this.validatePrimaryKey(requestedMode);
+        return ChangelogMode.newBuilder().addContainedKind(RowKind.INSERT).addContainedKind(RowKind.DELETE).addContainedKind(RowKind.UPDATE_AFTER).build();
+    }
+
+    private void validatePrimaryKey(ChangelogMode requestedMode) {
+        Preconditions.checkState(ChangelogMode.insertOnly().equals(requestedMode) || this.tableInfo.getSchema().getPrimaryKeyColumnCount() != 0, "please declare primary key for sink table when query contains update/delete record.");
+    }
+
+    @Override
+    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+        KuduSink<RowData> upsertKuduSink = new KuduSink<>(writerConfigBuilder.build(), tableInfo,
+                new RowDataUpsertOperationMapper(flinkSchema));
+        return SinkFunctionProvider.of(upsertKuduSink);
+    }
+
+    @Override
+    public DynamicTableSink copy() {
+        return new KuduDynamicTableSink(this.writerConfigBuilder, this.flinkSchema, this.tableInfo);
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "kudu";
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        KuduDynamicTableSink that = (KuduDynamicTableSink) o;
+        return Objects.equals(writerConfigBuilder, that.writerConfigBuilder) && Objects.equals(flinkSchema,
+                that.flinkSchema) && Objects.equals(tableInfo, that.tableInfo);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(writerConfigBuilder, flinkSchema, tableInfo);
+    }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSource.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSource.java
new file mode 100644
index 0000000..2022cd7
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSource.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.table.dynamic;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.connectors.kudu.connector.KuduFilterInfo;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.convertor.RowResultRowDataConvertor;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
+import org.apache.flink.connectors.kudu.format.KuduRowDataInputFormat;
+import org.apache.flink.connectors.kudu.table.function.lookup.KuduLookupOptions;
+import org.apache.flink.connectors.kudu.table.function.lookup.KuduRowDataLookupFunction;
+import org.apache.flink.connectors.kudu.table.utils.KuduTableUtils;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.InputFormatProvider;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.kudu.shaded.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * A {@link DynamicTableSource} for Kudu.
+ */
+public class KuduDynamicTableSource implements ScanTableSource, SupportsProjectionPushDown,
+        SupportsLimitPushDown, LookupTableSource, SupportsFilterPushDown {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KuduDynamicTableSource.class);
+    private final KuduTableInfo tableInfo;
+    private final KuduLookupOptions kuduLookupOptions;
+    private final KuduRowDataInputFormat kuduRowDataInputFormat;
+    private final transient List<KuduFilterInfo> predicates = Lists.newArrayList();
+    private KuduReaderConfig.Builder configBuilder;
+    private TableSchema physicalSchema;
+    private String[] projectedFields;
+    private transient List<ResolvedExpression> filters;
+
+    public KuduDynamicTableSource(KuduReaderConfig.Builder configBuilder, KuduTableInfo tableInfo,
+                                  TableSchema physicalSchema, String[] projectedFields,
+                                  KuduLookupOptions kuduLookupOptions) {
+        this.configBuilder = configBuilder;
+        this.tableInfo = tableInfo;
+        this.physicalSchema = physicalSchema;
+        this.projectedFields = projectedFields;
+        this.kuduRowDataInputFormat = new KuduRowDataInputFormat(configBuilder.build(),
+                new RowResultRowDataConvertor(), tableInfo,
+                predicates,
+                projectedFields == null ? null : Lists.newArrayList(projectedFields));
+        this.kuduLookupOptions = kuduLookupOptions;
+    }
+
+    @Override
+    public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
+        int keysLen = context.getKeys().length;
+        String[] keyNames = new String[keysLen];
+        for (int i = 0; i < keyNames.length; ++i) {
+            int[] innerKeyArr = context.getKeys()[i];
+            Preconditions.checkArgument(innerKeyArr.length == 1, "Kudu only support non-nested look up keys");
+            keyNames[i] = this.physicalSchema.getFieldNames()[innerKeyArr[0]];
+        }
+        KuduRowDataLookupFunction rowDataLookupFunction = KuduRowDataLookupFunction.Builder.options()
+                .keyNames(keyNames)
+                .kuduReaderConfig(configBuilder.build())
+                .projectedFields(projectedFields)
+                .tableInfo(tableInfo)
+                .kuduLookupOptions(kuduLookupOptions)
+                .build();
+        return TableFunctionProvider.of(rowDataLookupFunction);
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return ChangelogMode.insertOnly();
+    }
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
+        if (CollectionUtils.isNotEmpty(this.filters)) {
+            for (ResolvedExpression filter : this.filters) {
+                Optional<KuduFilterInfo> kuduFilterInfo = KuduTableUtils.toKuduFilterInfo(filter);
+                if (kuduFilterInfo != null && kuduFilterInfo.isPresent()) {
+                    this.predicates.add(kuduFilterInfo.get());
+                }
+
+            }
+        }
+        KuduRowDataInputFormat inputFormat = new KuduRowDataInputFormat(configBuilder.build(),
+                new RowResultRowDataConvertor(), tableInfo,
+                this.predicates,
+                projectedFields == null ? null : Lists.newArrayList(projectedFields));
+        return InputFormatProvider.of(inputFormat);
+    }
+
+    @Override
+    public DynamicTableSource copy() {
+        return new KuduDynamicTableSource(this.configBuilder, this.tableInfo, this.physicalSchema,
+                this.projectedFields, this.kuduLookupOptions);
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "kudu";
+    }
+
+    @Override
+    public boolean supportsNestedProjection() {
+        //  planner doesn't support nested projection push down yet.
+        return false;
+    }
+
+    @Override
+    public void applyProjection(int[][] projectedFields) {
+        // parser projectFields
+        this.physicalSchema = TableSchemaUtils.projectSchema(this.physicalSchema, projectedFields);
+        this.projectedFields = physicalSchema.getFieldNames();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        KuduDynamicTableSource that = (KuduDynamicTableSource) o;
+        return Objects.equals(configBuilder, that.configBuilder) && Objects.equals(tableInfo, that.tableInfo) && Objects.equals(physicalSchema, that.physicalSchema) && Arrays.equals(projectedFields, that.projectedFields) && Objects.equals(kuduLookupOptions, that.kuduLookupOptions) && Objects.equals(kuduRowDataInputFormat, that.kuduRowDataInputFormat) && Objects.equals(filters, that.filters) && Objects.equals(predicates, that.predicates);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = Objects.hash(configBuilder, tableInfo, physicalSchema,
+                kuduLookupOptions, kuduRowDataInputFormat, filters, predicates);
+        result = 31 * result + Arrays.hashCode(projectedFields);
+        return result;
+    }
+
+    @Override
+    public void applyLimit(long limit) {
+        this.configBuilder = this.configBuilder.setRowLimit((int) limit);
+    }
+
+    @Override
+    public Result applyFilters(List<ResolvedExpression> filters) {
+        this.filters = filters;
+        return Result.of(Collections.emptyList(), filters);
+    }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSourceSinkFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSourceSinkFactory.java
new file mode 100644
index 0000000..6fab41f
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSourceSinkFactory.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.table.dynamic;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
+import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
+import org.apache.flink.connectors.kudu.table.function.lookup.KuduLookupOptions;
+import org.apache.flink.connectors.kudu.table.utils.KuduTableUtils;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.kudu.shaded.com.google.common.collect.Sets;
+
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Factory for creating configured instances of {@link KuduDynamicTableSource}/{@link KuduDynamicTableSink} in
+ * a stream environment.
+ */
+public class KuduDynamicTableSourceSinkFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+    public static final String IDENTIFIER = "kudu";
+    public static final ConfigOption<String> KUDU_TABLE = ConfigOptions
+            .key("kudu.table")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("kudu's table name");
+
+    public static final ConfigOption<String> KUDU_MASTERS =
+            ConfigOptions
+                    .key("kudu.masters")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("kudu's master server address");
+
+
+    public static final ConfigOption<String> KUDU_HASH_COLS =
+            ConfigOptions
+                    .key("kudu.hash-columns")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("kudu's hash columns");
+
+    public static final ConfigOption<Integer> KUDU_REPLICAS =
+            ConfigOptions
+                    .key("kudu.replicas")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription("kudu's replica nums");
+
+    public static final ConfigOption<Integer> KUDU_MAX_BUFFER_SIZE =
+            ConfigOptions
+                    .key("kudu.max-buffer-size")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription("kudu's max buffer size");
+
+    public static final ConfigOption<Integer> KUDU_FLUSH_INTERVAL =
+            ConfigOptions
+                    .key("kudu.flush-interval")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription("kudu's data flush interval");
+
+    public static final ConfigOption<Long> KUDU_OPERATION_TIMEOUT =
+            ConfigOptions
+                    .key("kudu.operation-timeout")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription("kudu's operation timeout");
+
+    public static final ConfigOption<Boolean> KUDU_IGNORE_NOT_FOUND =
+            ConfigOptions
+                    .key("kudu.ignore-not-found")
+                    .booleanType()
+                    .noDefaultValue()
+                    .withDescription("if true, ignore all not found rows");
+
+    public static final ConfigOption<Boolean> KUDU_IGNORE_DUPLICATE =
+            ConfigOptions
+                    .key("kudu.ignore-not-found")
+                    .booleanType()
+                    .noDefaultValue()
+                    .withDescription("if true, ignore all dulicate rows");
+
+    /**
+     * hash partition bucket nums
+     */
+    public static final ConfigOption<Integer> KUDU_HASH_PARTITION_NUMS =
+            ConfigOptions
+                    .key("kudu.hash-partition-nums")
+                    .intType()
+                    .defaultValue(KUDU_REPLICAS.defaultValue() * 2)
+                    .withDescription("kudu's hash partition bucket nums, defaultValue is 2 * replica nums");
+
+    public static final ConfigOption<String> KUDU_PRIMARY_KEY_COLS =
+            ConfigOptions
+                    .key("kudu.primary-key-columns")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("kudu's primary key, primary key must be ordered");
+
+
+    public static final ConfigOption<Integer> KUDU_SCAN_ROW_SIZE =
+            ConfigOptions
+                    .key("kudu.scan.row-size")
+                    .intType()
+                    .defaultValue(0)
+                    .withDescription("kudu's scan row size");
+
+    /**
+     * lookup cache config
+     */
+    public static final ConfigOption<Long> KUDU_LOOKUP_CACHE_MAX_ROWS =
+            ConfigOptions
+                    .key("kudu.lookup.cache.max-rows")
+                    .longType()
+                    .defaultValue(-1L)
+                    .withDescription("the max number of rows of lookup cache, over this value, the oldest rows will " +
+                            "be eliminated. \"cache.max-rows\" and \"cache.ttl\" options must all be specified if any" +
+                            " of them is " +
+                            "specified. Cache is not enabled as default.");
+
+    public static final ConfigOption<Long> KUDU_LOOKUP_CACHE_TTL =
+            ConfigOptions
+                    .key("kudu.lookup.cache.ttl")
+                    .longType()
+                    .defaultValue(-1L)
+                    .withDescription("the cache time to live.");
+
+    public static final ConfigOption<Integer> KUDU_LOOKUP_MAX_RETRIES =
+            ConfigOptions
+                    .key("kudu.lookup.max-retries")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription("the max retry times if lookup database failed.");
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        ReadableConfig config = getReadableConfig(context);
+        String masterAddresses = config.get(KUDU_MASTERS);
+        String tableName = config.get(KUDU_TABLE);
+        Optional<Long> operationTimeout = config.getOptional(KUDU_OPERATION_TIMEOUT);
+        Optional<Integer> flushInterval = config.getOptional(KUDU_FLUSH_INTERVAL);
+        Optional<Integer> bufferSize = config.getOptional(KUDU_MAX_BUFFER_SIZE);
+        Optional<Boolean> ignoreNotFound = config.getOptional(KUDU_IGNORE_NOT_FOUND);
+        Optional<Boolean> ignoreDuplicate = config.getOptional(KUDU_IGNORE_DUPLICATE);
+        TableSchema schema = context.getCatalogTable().getSchema();
+        TableSchema physicalSchema = KuduTableUtils.getSchemaWithSqlTimestamp(schema);
+        KuduTableInfo tableInfo = KuduTableUtils.createTableInfo(tableName, schema,
+                context.getCatalogTable().toProperties());
+
+        KuduWriterConfig.Builder configBuilder = KuduWriterConfig.Builder
+                .setMasters(masterAddresses);
+        operationTimeout.ifPresent(configBuilder::setOperationTimeout);
+        flushInterval.ifPresent(configBuilder::setFlushInterval);
+        bufferSize.ifPresent(configBuilder::setMaxBufferSize);
+        ignoreNotFound.ifPresent(configBuilder::setIgnoreNotFound);
+        ignoreDuplicate.ifPresent(configBuilder::setIgnoreDuplicate);
+        return new KuduDynamicTableSink(configBuilder, physicalSchema, tableInfo);
+    }
+
+    /**
+     * get readableConfig
+     *
+     * @param context
+     * @return {@link ReadableConfig}
+     */
+    private ReadableConfig getReadableConfig(Context context) {
+        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+        return helper.getOptions();
+    }
+
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        ReadableConfig config = getReadableConfig(context);
+        String masterAddresses = config.get(KUDU_MASTERS);
+
+        int scanRowSize = config.get(KUDU_SCAN_ROW_SIZE);
+        long kuduCacheMaxRows = config.get(KUDU_LOOKUP_CACHE_MAX_ROWS);
+        long kuduCacheTtl = config.get(KUDU_LOOKUP_CACHE_TTL);
+        int kuduMaxReties = config.get(KUDU_LOOKUP_MAX_RETRIES);
+
+        // build kudu lookup options
+        KuduLookupOptions kuduLookupOptions = KuduLookupOptions.Builder.options().withCacheMaxSize(kuduCacheMaxRows)
+                .withCacheExpireMs(kuduCacheTtl)
+                .withMaxRetryTimes(kuduMaxReties)
+                .build();
+
+        TableSchema schema = context.getCatalogTable().getSchema();
+        TableSchema physicalSchema = KuduTableUtils.getSchemaWithSqlTimestamp(schema);
+        KuduTableInfo tableInfo = KuduTableUtils.createTableInfo(config.get(KUDU_TABLE), schema,
+                context.getCatalogTable().toProperties());
+
+        KuduReaderConfig.Builder configBuilder = KuduReaderConfig.Builder
+                .setMasters(masterAddresses)
+                .setRowLimit(scanRowSize);
+        return new KuduDynamicTableSource(configBuilder, tableInfo, physicalSchema, physicalSchema.getFieldNames(),
+                kuduLookupOptions);
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        return Sets.newHashSet(KUDU_TABLE, KUDU_MASTERS);
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        return Sets.newHashSet(KUDU_HASH_COLS, KUDU_HASH_PARTITION_NUMS,
+                KUDU_PRIMARY_KEY_COLS, KUDU_SCAN_ROW_SIZE, KUDU_REPLICAS,
+                KUDU_MAX_BUFFER_SIZE, KUDU_MAX_BUFFER_SIZE, KUDU_OPERATION_TIMEOUT,
+                KUDU_IGNORE_NOT_FOUND, KUDU_IGNORE_DUPLICATE,
+                //lookup
+                KUDU_LOOKUP_CACHE_MAX_ROWS, KUDU_LOOKUP_CACHE_TTL, KUDU_LOOKUP_MAX_RETRIES);
+    }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/catalog/KuduCatalogFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/catalog/KuduCatalogFactory.java
new file mode 100644
index 0000000..8ad9350
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/catalog/KuduCatalogFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.table.dynamic.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.IDENTIFIER;
+import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_MASTERS;
+import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
+
+/**
+ * Factory for {@link KuduDynamicCatalog}.
+ */
+@Internal
+public class KuduCatalogFactory implements CatalogFactory {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KuduCatalogFactory.class);
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(PROPERTY_VERSION);
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(KUDU_MASTERS);
+        return options;
+    }
+
+    @Override
+    public Catalog createCatalog(Context context) {
+        final FactoryUtil.CatalogFactoryHelper helper =
+                FactoryUtil.createCatalogFactoryHelper(this, context);
+        helper.validate();
+        return new KuduDynamicCatalog(context.getName(),
+                helper.getOptions().get(KUDU_MASTERS));
+    }
+
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/catalog/KuduDynamicCatalog.java
similarity index 85%
copy from flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java
copy to flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/catalog/KuduDynamicCatalog.java
index d8343e8..b531835 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/catalog/KuduDynamicCatalog.java
@@ -15,11 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.flink.connectors.kudu.table.dynamic.catalog;
 
-package org.apache.flink.connectors.kudu.table;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.table.AbstractReadOnlyCatalog;
+import org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory;
 import org.apache.flink.connectors.kudu.table.utils.KuduTableUtils;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableSchema;
@@ -39,16 +40,14 @@ import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.factories.TableFactory;
+import org.apache.flink.table.factories.Factory;
 import org.apache.flink.util.StringUtils;
-
-import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
-
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.client.AlterTableOptions;
 import org.apache.kudu.client.KuduClient;
 import org.apache.kudu.client.KuduException;
 import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.shaded.com.google.common.collect.Lists;
 import org.apache.kudu.shaded.com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,49 +62,50 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import static org.apache.flink.connectors.kudu.table.KuduTableFactory.KUDU_HASH_COLS;
-import static org.apache.flink.connectors.kudu.table.KuduTableFactory.KUDU_PRIMARY_KEY_COLS;
-import static org.apache.flink.connectors.kudu.table.KuduTableFactory.KUDU_REPLICAS;
+import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_HASH_COLS;
+import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_HASH_PARTITION_NUMS;
+import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_PRIMARY_KEY_COLS;
+import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_REPLICAS;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Catalog for reading and creating Kudu tables.
  */
-@PublicEvolving
-public class KuduCatalog extends AbstractReadOnlyCatalog {
+public class KuduDynamicCatalog extends AbstractReadOnlyCatalog {
 
-    private static final Logger LOG = LoggerFactory.getLogger(KuduCatalog.class);
-    private final KuduTableFactory tableFactory = new KuduTableFactory();
+    private static final Logger LOG = LoggerFactory.getLogger(KuduDynamicCatalog.class);
+    private final KuduDynamicTableSourceSinkFactory tableFactory = new KuduDynamicTableSourceSinkFactory();
     private final String kuduMasters;
-    private KuduClient kuduClient;
+    private final KuduClient kuduClient;
 
     /**
-     * Create a new {@link KuduCatalog} with the specified catalog name and kudu master addresses.
+     * Create a new {@link KuduDynamicCatalog} with the specified catalog name and kudu master addresses.
      *
      * @param catalogName Name of the catalog (used by the table environment)
      * @param kuduMasters Connection address to Kudu
      */
-    public KuduCatalog(String catalogName, String kuduMasters) {
+    public KuduDynamicCatalog(String catalogName, String kuduMasters) {
         super(catalogName, EnvironmentSettings.DEFAULT_BUILTIN_DATABASE);
         this.kuduMasters = kuduMasters;
         this.kuduClient = createClient();
     }
 
     /**
-     * Create a new {@link KuduCatalog} with the specified kudu master addresses.
+     * Create a new {@link KuduDynamicCatalog} with the specified kudu master addresses.
      *
      * @param kuduMasters Connection address to Kudu
      */
-    public KuduCatalog(String kuduMasters) {
+    public KuduDynamicCatalog(String kuduMasters) {
         this("kudu", kuduMasters);
     }
 
-    public Optional<TableFactory> getTableFactory() {
+    @Override
+    public Optional<Factory> getFactory() {
         return Optional.of(getKuduTableFactory());
     }
 
-    public KuduTableFactory getKuduTableFactory() {
+    public KuduDynamicTableSourceSinkFactory getKuduTableFactory() {
         return tableFactory;
     }
 
@@ -114,7 +114,8 @@ public class KuduCatalog extends AbstractReadOnlyCatalog {
     }
 
     @Override
-    public void open() {}
+    public void open() {
+    }
 
     @Override
     public void close() {
@@ -168,7 +169,7 @@ public class KuduCatalog extends AbstractReadOnlyCatalog {
 
         try {
             KuduTable kuduTable = kuduClient.openTable(tableName);
-
+            // fixme base on TableSchema, TableSchema needs to be upgraded to ResolvedSchema
             CatalogTableImpl table = new CatalogTableImpl(
                     KuduTableUtils.kuduToFlinkSchema(kuduTable.getSchema()),
                     createTableProperties(tableName, kuduTable.getSchema().getPrimaryKeyColumns()),
@@ -182,10 +183,10 @@ public class KuduCatalog extends AbstractReadOnlyCatalog {
 
     protected Map<String, String> createTableProperties(String tableName, List<ColumnSchema> primaryKeyColumns) {
         Map<String, String> props = new HashMap<>();
-        props.put(KuduTableFactory.KUDU_MASTERS, kuduMasters);
+        props.put(KuduDynamicTableSourceSinkFactory.KUDU_MASTERS.key(), kuduMasters);
         String primaryKeyNames = primaryKeyColumns.stream().map(ColumnSchema::getName).collect(Collectors.joining(","));
-        props.put(KuduTableFactory.KUDU_PRIMARY_KEY_COLS, primaryKeyNames);
-        props.put(KuduTableFactory.KUDU_TABLE, tableName);
+        props.put(KUDU_PRIMARY_KEY_COLS.key(), primaryKeyNames);
+        props.put(KuduDynamicTableSourceSinkFactory.KUDU_TABLE.key(), tableName);
         return props;
     }
 
@@ -240,11 +241,12 @@ public class KuduCatalog extends AbstractReadOnlyCatalog {
         Map<String, String> tableProperties = table.getOptions();
         TableSchema tableSchema = table.getSchema();
 
-        Set<String> optionalProperties = new HashSet<>(Arrays.asList(KUDU_REPLICAS));
-        Set<String> requiredProperties = new HashSet<>(Arrays.asList(KUDU_HASH_COLS));
+        Set<String> optionalProperties = new HashSet<>(Arrays.asList(KUDU_REPLICAS.key(),
+                KUDU_HASH_PARTITION_NUMS.key(), KUDU_HASH_COLS.key()));
+        Set<String> requiredProperties = new HashSet<>();
 
         if (!tableSchema.getPrimaryKey().isPresent()) {
-            requiredProperties.add(KUDU_PRIMARY_KEY_COLS);
+            requiredProperties.add(KUDU_PRIMARY_KEY_COLS.key());
         }
 
         if (!tableProperties.keySet().containsAll(requiredProperties)) {
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduLookupOptions.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduLookupOptions.java
new file mode 100644
index 0000000..9a2f23f
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduLookupOptions.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.table.function.lookup;
+
+/**
+ * Options for the Kudu lookup.
+ */
+public class KuduLookupOptions {
+    private final long cacheMaxSize;
+    private final long cacheExpireMs;
+    private final int maxRetryTimes;
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public KuduLookupOptions(long cacheMaxSize, long cacheExpireMs, int maxRetryTimes) {
+        this.cacheMaxSize = cacheMaxSize;
+        this.cacheExpireMs = cacheExpireMs;
+        this.maxRetryTimes = maxRetryTimes;
+    }
+
+    public long getCacheMaxSize() {
+        return cacheMaxSize;
+    }
+
+
+    public long getCacheExpireMs() {
+        return cacheExpireMs;
+    }
+
+
+    public int getMaxRetryTimes() {
+        return maxRetryTimes;
+    }
+
+
+    public static final class Builder {
+        private long cacheMaxSize;
+        private long cacheExpireMs;
+        private int maxRetryTimes;
+
+        public static Builder options() {
+            return new Builder();
+        }
+
+        public Builder withCacheMaxSize(long cacheMaxSize) {
+            this.cacheMaxSize = cacheMaxSize;
+            return this;
+        }
+
+        public Builder withCacheExpireMs(long cacheExpireMs) {
+            this.cacheExpireMs = cacheExpireMs;
+            return this;
+        }
+
+        public Builder withMaxRetryTimes(int maxRetryTimes) {
+            this.maxRetryTimes = maxRetryTimes;
+            return this;
+        }
+
+        public KuduLookupOptions build() {
+            return new KuduLookupOptions(cacheMaxSize, cacheExpireMs, maxRetryTimes);
+        }
+    }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java
new file mode 100644
index 0000000..4a4a952
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.table.function.lookup;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.flink.connectors.kudu.connector.KuduFilterInfo;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.convertor.RowResultConvertor;
+import org.apache.flink.connectors.kudu.connector.convertor.RowResultRowDataConvertor;
+import org.apache.flink.connectors.kudu.connector.reader.KuduInputSplit;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReader;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReaderIterator;
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * LookupFunction based on the RowData object type
+ */
+public class KuduRowDataLookupFunction extends TableFunction<RowData> {
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG = LoggerFactory.getLogger(KuduRowDataLookupFunction.class);
+
+    private final KuduTableInfo tableInfo;
+    private final KuduReaderConfig kuduReaderConfig;
+    private final String[] keyNames;
+    private final String[] projectedFields;
+    private final long cacheMaxSize;
+    private final long cacheExpireMs;
+    private final int maxRetryTimes;
+    private final RowResultConvertor<RowData> convertor;
+
+    private transient Cache<RowData, List<RowData>> cache;
+    private transient KuduReader<RowData> kuduReader;
+
+    private KuduRowDataLookupFunction(String[] keyNames, KuduTableInfo tableInfo, KuduReaderConfig kuduReaderConfig,
+                                      String[] projectedFields, KuduLookupOptions kuduLookupOptions) {
+        this.tableInfo = tableInfo;
+        this.convertor = new RowResultRowDataConvertor();
+        this.projectedFields = projectedFields;
+        this.keyNames = keyNames;
+        this.kuduReaderConfig = kuduReaderConfig;
+        this.cacheMaxSize = kuduLookupOptions.getCacheMaxSize();
+        this.cacheExpireMs = kuduLookupOptions.getCacheExpireMs();
+        this.maxRetryTimes = kuduLookupOptions.getMaxRetryTimes();
+    }
+
+    public RowData buildCacheKey(Object... keys) {
+        return GenericRowData.of(keys);
+    }
+
+    /**
+     * invoke entry point of lookup function.
+     *
+     * @param keys join keys
+     */
+    public void eval(Object... keys) {
+        if (keys.length != keyNames.length) {
+            throw new RuntimeException("The join keys are of unequal lengths");
+        }
+        // cache key
+        RowData keyRow = buildCacheKey(keys);
+        if (this.cache != null) {
+            List<RowData> cacheRows = this.cache.getIfPresent(keyRow);
+            if (CollectionUtils.isNotEmpty(cacheRows)) {
+                for (RowData cacheRow : cacheRows) {
+                    collect(cacheRow);
+                }
+                return;
+            }
+        }
+
+        for (int retry = 1; retry <= maxRetryTimes; retry++) {
+            try {
+                List<KuduFilterInfo> kuduFilterInfos = buildKuduFilterInfo(keys);
+                this.kuduReader.setTableFilters(kuduFilterInfos);
+                KuduInputSplit[] inputSplits = kuduReader.createInputSplits(1);
+                ArrayList<RowData> rows = new ArrayList<>();
+                for (KuduInputSplit inputSplit : inputSplits) {
+                    KuduReaderIterator<RowData> scanner = kuduReader.scanner(inputSplit.getScanToken());
+                    // 没有启用cache
+                    if (cache == null) {
+                        while (scanner.hasNext()) {
+                            collect(scanner.next());
+                        }
+                    } else {
+                        while (scanner.hasNext()) {
+                            RowData row = scanner.next();
+                            rows.add(row);
+                            collect(row);
+                        }
+                        rows.trimToSize();
+                    }
+                }
+                if (cache != null) {
+                    cache.put(keyRow, rows);
+                }
+                break;
+            } catch (Exception e) {
+                LOG.error(String.format("Kudu scan error, retry times = %d", retry), e);
+                if (retry >= maxRetryTimes) {
+                    throw new RuntimeException("Execution of Kudu scan failed.", e);
+                }
+                try {
+                    Thread.sleep(1000L * retry);
+                } catch (InterruptedException e1) {
+                    throw new RuntimeException(e1);
+                }
+            }
+        }
+    }
+
+    /**
+     * build kuduFilterInfo
+     *
+     * @return kudu filters
+     */
+    private List<KuduFilterInfo> buildKuduFilterInfo(Object... keyValS) {
+        List<KuduFilterInfo> kuduFilterInfos = Lists.newArrayList();
+        for (int i = 0; i < keyNames.length; i++) {
+            KuduFilterInfo kuduFilterInfo = KuduFilterInfo.Builder.create(keyNames[i])
+                    .equalTo(keyValS[i]).build();
+            kuduFilterInfos.add(kuduFilterInfo);
+        }
+        return kuduFilterInfos;
+    }
+
+
+    @Override
+    public void open(FunctionContext context) {
+        try {
+            super.open(context);
+            this.kuduReader = new KuduReader<>(this.tableInfo, this.kuduReaderConfig, this.convertor);
+            // build kudu cache
+            this.kuduReader.setTableProjections(ArrayUtils.isNotEmpty(projectedFields) ?
+                    Arrays.asList(projectedFields) : null);
+            this.cache = this.cacheMaxSize == -1 || this.cacheExpireMs == -1 ? null : CacheBuilder.newBuilder()
+                    .expireAfterWrite(this.cacheExpireMs, TimeUnit.MILLISECONDS)
+                    .maximumSize(this.cacheMaxSize)
+                    .build();
+        } catch (Exception ioe) {
+            LOG.error("Exception while creating connection to Kudu.", ioe);
+            throw new RuntimeException("Cannot create connection to Kudu.", ioe);
+        }
+    }
+
+    @Override
+    public void close() {
+        if (null != this.kuduReader) {
+            try {
+                this.kuduReader.close();
+                this.cache.cleanUp();
+                // help gc
+                this.cache = null;
+                this.kuduReader = null;
+            } catch (IOException e) {
+                // ignore exception when close.
+                LOG.warn("exception when close table", e);
+            }
+        }
+    }
+
+    public static class Builder {
+        private KuduTableInfo tableInfo;
+        private KuduReaderConfig kuduReaderConfig;
+        private String[] keyNames;
+        private String[] projectedFields;
+        private KuduLookupOptions kuduLookupOptions;
+
+        public static Builder options() {
+            return new Builder();
+        }
+
+        public Builder tableInfo(KuduTableInfo tableInfo) {
+            this.tableInfo = tableInfo;
+            return this;
+        }
+
+        public Builder kuduReaderConfig(KuduReaderConfig kuduReaderConfig) {
+            this.kuduReaderConfig = kuduReaderConfig;
+            return this;
+        }
+
+        public Builder keyNames(String[] keyNames) {
+            this.keyNames = keyNames;
+            return this;
+        }
+
+        public Builder projectedFields(String[] projectedFields) {
+            this.projectedFields = projectedFields;
+            return this;
+        }
+
+        public Builder kuduLookupOptions(KuduLookupOptions kuduLookupOptions) {
+            this.kuduLookupOptions = kuduLookupOptions;
+            return this;
+        }
+
+        public KuduRowDataLookupFunction build() {
+            return new KuduRowDataLookupFunction(keyNames, tableInfo, kuduReaderConfig, projectedFields,
+                    kuduLookupOptions);
+        }
+    }
+}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java
index aa9b34e..1d5be62 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flink.connectors.kudu.table.utils;
 
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -22,7 +21,7 @@ import org.apache.flink.connectors.kudu.connector.ColumnSchemasFactory;
 import org.apache.flink.connectors.kudu.connector.CreateTableOptionsFactory;
 import org.apache.flink.connectors.kudu.connector.KuduFilterInfo;
 import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
-import org.apache.flink.connectors.kudu.table.KuduTableFactory;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.expressions.CallExpression;
 import org.apache.flink.table.expressions.Expression;
@@ -34,9 +33,6 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.utils.TableSchemaUtils;
-
-import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
-
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.ColumnTypeAttributes;
 import org.apache.kudu.Schema;
@@ -54,8 +50,11 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
-import static org.apache.flink.connectors.kudu.table.KuduTableFactory.KUDU_HASH_COLS;
-import static org.apache.flink.connectors.kudu.table.KuduTableFactory.KUDU_PRIMARY_KEY_COLS;
+import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_HASH_COLS;
+import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_HASH_PARTITION_NUMS;
+import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_PRIMARY_KEY_COLS;
+import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_REPLICAS;
+
 
 public class KuduTableUtils {
 
@@ -63,8 +62,7 @@ public class KuduTableUtils {
 
     public static KuduTableInfo createTableInfo(String tableName, TableSchema schema, Map<String, String> props) {
         // Since KUDU_HASH_COLS is a required property for table creation, we use it to infer whether to create table
-        boolean createIfMissing = props.containsKey(KUDU_HASH_COLS);
-
+        boolean createIfMissing = props.containsKey(KUDU_PRIMARY_KEY_COLS.key()) || schema.getPrimaryKey().isPresent();
         KuduTableInfo tableInfo = KuduTableInfo.forTable(tableName);
 
         if (createIfMissing) {
@@ -77,34 +75,35 @@ public class KuduTableUtils {
 
             List<String> keyColumns = getPrimaryKeyColumns(props, schema);
             ColumnSchemasFactory schemasFactory = () -> toKuduConnectorColumns(columns, keyColumns);
-            List<String> hashColumns = getHashColumns(props);
-            int replicas = Optional.ofNullable(props.get(KuduTableFactory.KUDU_REPLICAS)).map(Integer::parseInt).orElse(1);
-
+            int replicas = Optional.ofNullable(props.get(KUDU_REPLICAS.key())).map(Integer::parseInt).orElse(1);
+            // if hash partitions nums not exists,default 3;
+            int hashPartitionNums =
+                    Optional.ofNullable(props.get(KUDU_HASH_PARTITION_NUMS.key())).map(Integer::parseInt).orElse(3);
             CreateTableOptionsFactory optionsFactory = () -> new CreateTableOptions()
                     .setNumReplicas(replicas)
-                    .addHashPartitions(hashColumns, replicas * 2);
-
+                    .addHashPartitions(getHashColumns(props), hashPartitionNums);
             tableInfo.createTableIfNotExists(schemasFactory, optionsFactory);
         } else {
-            LOG.debug("Property {} is missing, assuming the table is already created.", KUDU_HASH_COLS);
+            LOG.debug("Property {} is missing, assuming the table is already created.", KUDU_HASH_COLS.key());
         }
 
         return tableInfo;
     }
 
-    public static List<ColumnSchema> toKuduConnectorColumns(List<Tuple2<String, DataType>> columns, Collection<String> keyColumns) {
+    public static List<ColumnSchema> toKuduConnectorColumns(List<Tuple2<String, DataType>> columns,
+                                                            Collection<String> keyColumns) {
         return columns.stream()
                 .map(t -> {
                             ColumnSchema.ColumnSchemaBuilder builder = new ColumnSchema
                                     .ColumnSchemaBuilder(t.f0, KuduTypeUtils.toKuduType(t.f1))
                                     .key(keyColumns.contains(t.f0))
                                     .nullable(!keyColumns.contains(t.f0) && t.f1.getLogicalType().isNullable());
-                            if(t.f1.getLogicalType() instanceof DecimalType) {
+                            if (t.f1.getLogicalType() instanceof DecimalType) {
                                 DecimalType decimalType = ((DecimalType) t.f1.getLogicalType());
                                 builder.typeAttributes(new ColumnTypeAttributes.ColumnTypeAttributesBuilder()
-                                    .precision(decimalType.getPrecision())
-                                    .scale(decimalType.getScale())
-                                    .build());
+                                        .precision(decimalType.getPrecision())
+                                        .scale(decimalType.getScale())
+                                        .build());
                             }
                             return builder.build();
                         }
@@ -123,13 +122,16 @@ public class KuduTableUtils {
     }
 
     public static List<String> getPrimaryKeyColumns(Map<String, String> tableProperties, TableSchema tableSchema) {
-        return tableProperties.containsKey(KUDU_PRIMARY_KEY_COLS) ? Arrays.asList(tableProperties.get(KUDU_PRIMARY_KEY_COLS).split(",")) : tableSchema.getPrimaryKey().get().getColumns();
+        return tableProperties.containsKey(KUDU_PRIMARY_KEY_COLS.key()) ?
+                Arrays.asList(tableProperties.get(KUDU_PRIMARY_KEY_COLS.key()).split(",")) :
+                tableSchema.getPrimaryKey().get().getColumns();
     }
 
     public static List<String> getHashColumns(Map<String, String> tableProperties) {
-        return Lists.newArrayList(tableProperties.get(KUDU_HASH_COLS).split(","));
+        return Lists.newArrayList(tableProperties.get(KUDU_HASH_COLS.key()).split(","));
     }
 
+
     public static TableSchema getSchemaWithSqlTimestamp(TableSchema schema) {
         TableSchema.Builder builder = new TableSchema.Builder();
         TableSchemaUtils.getPhysicalSchema(schema).getTableColumns().forEach(
@@ -149,7 +151,7 @@ public class KuduTableUtils {
     @Nullable
     public static Optional<KuduFilterInfo> toKuduFilterInfo(Expression predicate) {
         LOG.debug("predicate summary: [{}], class: [{}], children: [{}]",
-            predicate.asSummaryString(), predicate.getClass(), predicate.getChildren());
+                predicate.asSummaryString(), predicate.getClass(), predicate.getChildren());
         if (predicate instanceof CallExpression) {
             CallExpression callExpression = (CallExpression) predicate;
             FunctionDefinition functionDefinition = callExpression.getFunctionDefinition();
@@ -157,7 +159,7 @@ public class KuduTableUtils {
             if (children.size() == 1) {
                 return convertUnaryIsNullExpression(functionDefinition, children);
             } else if (children.size() == 2 &&
-                !functionDefinition.equals(BuiltInFunctionDefinitions.OR)) {
+                    !functionDefinition.equals(BuiltInFunctionDefinitions.OR)) {
                 return convertBinaryComparison(functionDefinition, children);
             } else if (children.size() > 0 && functionDefinition.equals(BuiltInFunctionDefinitions.OR)) {
                 return convertIsInExpression(children);
@@ -175,7 +177,7 @@ public class KuduTableUtils {
     }
 
     private static Optional<KuduFilterInfo> convertUnaryIsNullExpression(
-        FunctionDefinition functionDefinition, List<Expression> children) {
+            FunctionDefinition functionDefinition, List<Expression> children) {
         FieldReferenceExpression fieldReferenceExpression;
         if (isFieldReferenceExpression(children.get(0))) {
             fieldReferenceExpression = (FieldReferenceExpression) children.get(0);
@@ -194,15 +196,15 @@ public class KuduTableUtils {
     }
 
     private static Optional<KuduFilterInfo> convertBinaryComparison(
-        FunctionDefinition functionDefinition, List<Expression> children) {
+            FunctionDefinition functionDefinition, List<Expression> children) {
         FieldReferenceExpression fieldReferenceExpression;
         ValueLiteralExpression valueLiteralExpression;
         if (isFieldReferenceExpression(children.get(0)) &&
-            isValueLiteralExpression(children.get(1))) {
+                isValueLiteralExpression(children.get(1))) {
             fieldReferenceExpression = (FieldReferenceExpression) children.get(0);
             valueLiteralExpression = (ValueLiteralExpression) children.get(1);
         } else if (isValueLiteralExpression(children.get(0)) &&
-            isFieldReferenceExpression(children.get(1))) {
+                isFieldReferenceExpression(children.get(1))) {
             fieldReferenceExpression = (FieldReferenceExpression) children.get(1);
             valueLiteralExpression = (ValueLiteralExpression) children.get(0);
         } else {
@@ -243,8 +245,8 @@ public class KuduTableUtils {
                 FieldReferenceExpression fieldReferenceExpression;
                 ValueLiteralExpression valueLiteralExpression;
                 if (functionDefinition.equals(BuiltInFunctionDefinitions.EQUALS) &&
-                    subChildren.size() == 2 && isFieldReferenceExpression(subChildren.get(0)) &&
-                    isValueLiteralExpression(subChildren.get(1))) {
+                        subChildren.size() == 2 && isFieldReferenceExpression(subChildren.get(0)) &&
+                        isValueLiteralExpression(subChildren.get(1))) {
                     fieldReferenceExpression = (FieldReferenceExpression) subChildren.get(0);
                     valueLiteralExpression = (ValueLiteralExpression) subChildren.get(1);
                     String fieldName = fieldReferenceExpression.getName();
@@ -254,13 +256,13 @@ public class KuduTableUtils {
                         columnName = fieldName;
                     }
                     Object value = extractValueLiteral(fieldReferenceExpression,
-                        valueLiteralExpression);
+                            valueLiteralExpression);
                     if (value == null) {
                         return Optional.empty();
                     }
                     values.add(i, value);
                 } else {
-                   return Optional.empty();
+                    return Optional.empty();
                 }
             } else {
                 return Optional.empty();
@@ -271,7 +273,7 @@ public class KuduTableUtils {
     }
 
     private static Object extractValueLiteral(FieldReferenceExpression fieldReferenceExpression,
-        ValueLiteralExpression valueLiteralExpression) {
+                                              ValueLiteralExpression valueLiteralExpression) {
         DataType fieldType = fieldReferenceExpression.getOutputDataType();
         return valueLiteralExpression.getValueAs(fieldType.getConversionClass()).orElse(null);
     }
diff --git a/flink-connector-kudu/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-connector-kudu/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
similarity index 87%
copy from flink-connector-kudu/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
copy to flink-connector-kudu/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 487f2d9..6da911f 100644
--- a/flink-connector-kudu/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ b/flink-connector-kudu/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -13,5 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.connectors.kudu.table.KuduTableFactory
-org.apache.flink.connectors.kudu.table.KuduCatalogFactory
+org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory
\ No newline at end of file
diff --git a/flink-connector-kudu/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-connector-kudu/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index 487f2d9..3ae32fe 100644
--- a/flink-connector-kudu/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ b/flink-connector-kudu/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -14,4 +14,4 @@
 # limitations under the License.
 
 org.apache.flink.connectors.kudu.table.KuduTableFactory
-org.apache.flink.connectors.kudu.table.KuduCatalogFactory
+org.apache.flink.connectors.kudu.table.dynamic.catalog.KuduCatalogFactory
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java
index 3a872c5..90b0746 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java
@@ -17,6 +17,8 @@
 package org.apache.flink.connectors.kudu.connector;
 
 import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.connectors.kudu.connector.convertor.RowResultRowConvertor;
+import org.apache.flink.connectors.kudu.connector.convertor.RowResultRowDataConvertor;
 import org.apache.flink.connectors.kudu.connector.reader.KuduInputSplit;
 import org.apache.flink.connectors.kudu.connector.reader.KuduReader;
 import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
@@ -25,11 +27,20 @@ import org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperation
 import org.apache.flink.connectors.kudu.connector.writer.KuduWriter;
 import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
 import org.apache.flink.connectors.kudu.connector.writer.RowOperationMapper;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
 import org.apache.flink.types.Row;
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
-import org.apache.kudu.client.*;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.RowResult;
 import org.apache.kudu.shaded.com.google.common.collect.Lists;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
@@ -52,25 +63,21 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class KuduTestBase {
 
-    private static final String DOCKER_IMAGE = "apache/kudu:1.11.1";
+    private static final String DOCKER_IMAGE = "apache/kudu:1.13.0";
     private static final Integer KUDU_MASTER_PORT = 7051;
     private static final Integer KUDU_TSERVER_PORT = 7050;
     private static final Integer NUMBER_OF_REPLICA = 3;
-
-    private static GenericContainer<?> master;
-    private static List<GenericContainer<?>> tServers;
-
-    private static String masterAddress;
-    private static KuduClient kuduClient;
-
     private static final Object[][] booksTableData = {
             {1001, "Java for dummies", "Tan Ah Teck", 11.11, 11},
             {1002, "More Java for dummies", "Tan Ah Teck", 22.22, 22},
             {1003, "More Java for more dummies", "Mohammad Ali", 33.33, 33},
             {1004, "A Cup of Java", "Kumar", 44.44, 44},
             {1005, "A Teaspoon of Java", "Kevin Jones", 55.55, 55}};
-
     public static String[] columns = new String[]{"id", "title", "author", "price", "quantity"};
+    private static GenericContainer<?> master;
+    private static List<GenericContainer<?>> tServers;
+    private static String masterAddress;
+    private static KuduClient kuduClient;
 
     @BeforeAll
     public static void beforeClass() throws Exception {
@@ -91,7 +98,8 @@ public class KuduTestBase {
                     .withExposedPorts(KUDU_TSERVER_PORT)
                     .withCommand("tserver")
                     .withEnv("KUDU_MASTERS", "kudu-master:" + KUDU_MASTER_PORT)
-                    .withEnv("TSERVER_ARGS", "--fs_wal_dir=/var/lib/kudu/tserver --use_hybrid_clock=false --rpc_advertised_addresses=" + instanceName)
+                    .withEnv("TSERVER_ARGS", "--fs_wal_dir=/var/lib/kudu/tserver --use_hybrid_clock=false " +
+                            "--rpc_advertised_addresses=" + instanceName)
                     .withNetwork(network)
                     .withNetworkAliases(instanceName)
                     .dependsOn(master);
@@ -115,15 +123,6 @@ public class KuduTestBase {
         }
     }
 
-    public String getMasterAddress() {
-        return masterAddress;
-    }
-
-    public KuduClient getClient() {
-        return kuduClient;
-    }
-
-
     public static KuduTableInfo booksTableInfo(String tableName, boolean createIfNotExist) {
 
         KuduTableInfo tableInfo = KuduTableInfo.forTable(tableName);
@@ -159,7 +158,7 @@ public class KuduTestBase {
                                         (String) row[1],
                                         (String) row[2],
                                         (Double) row[3],
-                                        (Integer)row[4]);
+                                        (Integer) row[4]);
                         return values;
                     } else {
                         Tuple5<Integer, String, String, Double, Integer> values =
@@ -173,6 +172,39 @@ public class KuduTestBase {
                 .collect(Collectors.toList());
     }
 
+    public static TableSchema booksTableSchema(){
+        return TableSchema.builder()
+                .field("id", DataTypes.INT())
+                .field("title", DataTypes.STRING())
+                .field( "author", DataTypes.STRING())
+                .field("price", DataTypes.DOUBLE())
+                .field("quantity", DataTypes.INT())
+                .build();
+    }
+
+    public static List<RowData> booksRowData() {
+        return Arrays.stream(booksTableData)
+                .map(row -> {
+                    Integer rowId = (Integer) row[0];
+                    if (rowId % 2 == 1) {
+                        GenericRowData values = new GenericRowData(5);
+                        values.setField(0, row[0]);
+                        values.setField(1, StringData.fromString(row[1].toString()));
+                        values.setField(2, StringData.fromString(row[2].toString()));
+                        values.setField(3, row[3]);
+                        values.setField(4, row[4]);
+                        return values;
+                    } else {
+                        GenericRowData values = new GenericRowData(5);
+                        values.setField(0, row[0]);
+                        values.setField(1,  StringData.fromString(row[1].toString()));
+                        values.setField(2, StringData.fromString(row[2].toString()));
+                        return values;
+                    }
+                })
+                .collect(Collectors.toList());
+    }
+
     public static List<Row> booksDataRow() {
         return Arrays.stream(booksTableData)
                 .map(row -> {
@@ -198,19 +230,28 @@ public class KuduTestBase {
 
     public static List<BookInfo> booksDataPojo() {
         return Arrays.stream(booksTableData).map(row -> new BookInfo(
-                (int) row[0],
-                (String) row[1],
-                (String) row[2],
-                (Double) row[3],
-                (int) row[4]))
+                        (int) row[0],
+                        (String) row[1],
+                        (String) row[2],
+                        (Double) row[3],
+                        (int) row[4]))
                 .collect(Collectors.toList());
     }
 
+    public String getMasterAddress() {
+        return masterAddress;
+    }
+
+    public KuduClient getClient() {
+        return kuduClient;
+    }
+
     protected void setUpDatabase(KuduTableInfo tableInfo) {
         try {
             String masterAddresses = getMasterAddress();
             KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
-            KuduWriter kuduWriter = new KuduWriter(tableInfo, writerConfig, new RowOperationMapper(columns, AbstractSingleOperationMapper.KuduOperation.INSERT));
+            KuduWriter kuduWriter = new KuduWriter(tableInfo, writerConfig, new RowOperationMapper(columns,
+                    AbstractSingleOperationMapper.KuduOperation.INSERT));
             booksDataRow().forEach(row -> {
                 try {
                     kuduWriter.write(row);
@@ -228,7 +269,8 @@ public class KuduTestBase {
         try {
             String masterAddresses = getMasterAddress();
             KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
-            KuduWriter kuduWriter = new KuduWriter(tableInfo, writerConfig, new RowOperationMapper(columns, AbstractSingleOperationMapper.KuduOperation.INSERT));
+            KuduWriter kuduWriter = new KuduWriter(tableInfo, writerConfig, new RowOperationMapper(columns,
+                    AbstractSingleOperationMapper.KuduOperation.INSERT));
             kuduWriter.deleteTable();
             kuduWriter.close();
         } catch (Exception e) {
@@ -239,12 +281,12 @@ public class KuduTestBase {
     protected List<Row> readRows(KuduTableInfo tableInfo) throws Exception {
         String masterAddresses = getMasterAddress();
         KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build();
-        KuduReader reader = new KuduReader(tableInfo, readerConfig);
+        KuduReader<Row> reader = new KuduReader<>(tableInfo, readerConfig, new RowResultRowConvertor());
 
         KuduInputSplit[] splits = reader.createInputSplits(1);
         List<Row> rows = new ArrayList<>();
         for (KuduInputSplit split : splits) {
-            KuduReaderIterator resultIterator = reader.scanner(split.getScanToken());
+            KuduReaderIterator<Row> resultIterator = reader.scanner(split.getScanToken());
             while (resultIterator.hasNext()) {
                 Row row = resultIterator.next();
                 if (row != null) {
@@ -257,6 +299,27 @@ public class KuduTestBase {
         return rows;
     }
 
+    protected List<RowData> readRowDatas(KuduTableInfo tableInfo) throws Exception {
+        String masterAddresses = getMasterAddress();
+        KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build();
+        KuduReader<RowData> reader = new KuduReader<>(tableInfo, readerConfig, new RowResultRowDataConvertor());
+
+        KuduInputSplit[] splits = reader.createInputSplits(1);
+        List<RowData> rows = new ArrayList<>();
+        for (KuduInputSplit split : splits) {
+            KuduReaderIterator<RowData> resultIterator = reader.scanner(split.getScanToken());
+            while (resultIterator.hasNext()) {
+                RowData row = resultIterator.next();
+                if (row != null) {
+                    rows.add(row);
+                }
+            }
+        }
+        reader.close();
+
+        return rows;
+    }
+
     protected void kuduRowsTest(List<Row> rows) {
         for (Row row : rows) {
             Integer rowId = (Integer) row.getField(0);
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/format/KuduOutputFormatTest.java
similarity index 98%
rename from flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormatTest.java
rename to flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/format/KuduOutputFormatTest.java
index 693e113..dc8f777 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormatTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/format/KuduOutputFormatTest.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.connectors.kudu.batch;
+package org.apache.flink.connectors.kudu.format;
 
 import org.apache.flink.connectors.kudu.connector.KuduTestBase;
 import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/format/KuduRowDataInputFormatTest.java
similarity index 62%
copy from flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java
copy to flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/format/KuduRowDataInputFormatTest.java
index 126f7fd..aa391fc 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/format/KuduRowDataInputFormatTest.java
@@ -14,14 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.connectors.kudu.batch;
+package org.apache.flink.connectors.kudu.format;
 
-import org.apache.flink.connectors.kudu.connector.KuduTestBase;
 import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.KuduTestBase;
+import org.apache.flink.connectors.kudu.connector.convertor.RowResultRowConvertor;
+import org.apache.flink.connectors.kudu.connector.convertor.RowResultRowDataConvertor;
 import org.apache.flink.connectors.kudu.connector.reader.KuduInputSplit;
 import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.types.Row;
-
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -29,19 +32,21 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-class KuduInputFormatTest extends KuduTestBase {
+class KuduRowDataInputFormatTest extends KuduTestBase {
 
     @Test
     void testInvalidKuduMaster() {
         KuduTableInfo tableInfo = booksTableInfo("books", false);
-        Assertions.assertThrows(NullPointerException.class, () -> new KuduRowInputFormat(null, tableInfo));
+        Assertions.assertThrows(NullPointerException.class, () -> new KuduRowDataInputFormat(null,
+                new RowResultRowDataConvertor(), tableInfo));
     }
 
     @Test
     void testInvalidTableInfo() {
         String masterAddresses = getMasterAddress();
         KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build();
-        Assertions.assertThrows(NullPointerException.class, () -> new KuduRowInputFormat(readerConfig, null));
+        Assertions.assertThrows(NullPointerException.class, () -> new KuduRowDataInputFormat(readerConfig,
+                new RowResultRowDataConvertor(), null));
     }
 
     @Test
@@ -49,7 +54,7 @@ class KuduInputFormatTest extends KuduTestBase {
         KuduTableInfo tableInfo = booksTableInfo("books", true);
         setUpDatabase(tableInfo);
 
-        List<Row> rows = readRows(tableInfo);
+        List<RowData> rows = readRowDatas(tableInfo);
         Assertions.assertEquals(5, rows.size());
 
         cleanDatabase(tableInfo);
@@ -60,21 +65,43 @@ class KuduInputFormatTest extends KuduTestBase {
         KuduTableInfo tableInfo = booksTableInfo("books", true);
         setUpDatabase(tableInfo);
 
-        List<Row> rows = readRows(tableInfo, "title", "id");
+        List<RowData> rows = readRowDatas(tableInfo, "title", "id");
         Assertions.assertEquals(5, rows.size());
 
-        for (Row row : rows) {
+        for (RowData row : rows) {
             Assertions.assertEquals(2, row.getArity());
         }
 
         cleanDatabase(tableInfo);
     }
 
+    private List<RowData> readRowDatas(KuduTableInfo tableInfo, String... fieldProjection) throws Exception {
+        String masterAddresses = getMasterAddress();
+        KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build();
+        KuduRowDataInputFormat inputFormat = new KuduRowDataInputFormat(readerConfig, new RowResultRowDataConvertor(),
+                tableInfo, new ArrayList<>(), fieldProjection == null ? null : Arrays.asList(fieldProjection));
+
+        KuduInputSplit[] splits = inputFormat.createInputSplits(1);
+        List<RowData> rows = new ArrayList<>();
+        for (KuduInputSplit split : splits) {
+            inputFormat.open(split);
+            while (!inputFormat.reachedEnd()) {
+                RowData row = inputFormat.nextRecord(new GenericRowData(5));
+                if (row != null) {
+                    rows.add(row);
+                }
+            }
+        }
+        inputFormat.close();
+
+        return rows;
+    }
+
     private List<Row> readRows(KuduTableInfo tableInfo, String... fieldProjection) throws Exception {
         String masterAddresses = getMasterAddress();
         KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build();
-        KuduRowInputFormat inputFormat = new KuduRowInputFormat(readerConfig, tableInfo, new ArrayList<>(),
-                fieldProjection == null ? null : Arrays.asList(fieldProjection));
+        KuduRowInputFormat inputFormat = new KuduRowInputFormat(readerConfig, new RowResultRowConvertor(), tableInfo,
+                new ArrayList<>(), fieldProjection == null ? null : Arrays.asList(fieldProjection));
 
         KuduInputSplit[] splits = inputFormat.createInputSplits(1);
         List<Row> rows = new ArrayList<>();
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/format/KuduRowInputFormatTest.java
similarity index 86%
rename from flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java
rename to flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/format/KuduRowInputFormatTest.java
index 126f7fd..cb413a5 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/format/KuduRowInputFormatTest.java
@@ -14,14 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.connectors.kudu.batch;
+package org.apache.flink.connectors.kudu.format;
 
-import org.apache.flink.connectors.kudu.connector.KuduTestBase;
 import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.KuduTestBase;
+import org.apache.flink.connectors.kudu.connector.convertor.RowResultRowConvertor;
 import org.apache.flink.connectors.kudu.connector.reader.KuduInputSplit;
 import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
 import org.apache.flink.types.Row;
-
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -29,19 +29,21 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-class KuduInputFormatTest extends KuduTestBase {
+class KuduRowInputFormatTest extends KuduTestBase {
 
     @Test
     void testInvalidKuduMaster() {
         KuduTableInfo tableInfo = booksTableInfo("books", false);
-        Assertions.assertThrows(NullPointerException.class, () -> new KuduRowInputFormat(null, tableInfo));
+        Assertions.assertThrows(NullPointerException.class, () -> new KuduRowInputFormat(null,
+                new RowResultRowConvertor(), tableInfo));
     }
 
     @Test
     void testInvalidTableInfo() {
         String masterAddresses = getMasterAddress();
         KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build();
-        Assertions.assertThrows(NullPointerException.class, () -> new KuduRowInputFormat(readerConfig, null));
+        Assertions.assertThrows(NullPointerException.class, () -> new KuduRowInputFormat(readerConfig,
+                new RowResultRowConvertor(), null));
     }
 
     @Test
@@ -73,8 +75,8 @@ class KuduInputFormatTest extends KuduTestBase {
     private List<Row> readRows(KuduTableInfo tableInfo, String... fieldProjection) throws Exception {
         String masterAddresses = getMasterAddress();
         KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build();
-        KuduRowInputFormat inputFormat = new KuduRowInputFormat(readerConfig, tableInfo, new ArrayList<>(),
-                fieldProjection == null ? null : Arrays.asList(fieldProjection));
+        KuduRowInputFormat inputFormat = new KuduRowInputFormat(readerConfig, new RowResultRowConvertor(), tableInfo,
+                new ArrayList<>(), fieldProjection == null ? null : Arrays.asList(fieldProjection));
 
         KuduInputSplit[] splits = inputFormat.createInputSplits(1);
         List<Row> rows = new ArrayList<>();
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java
index 2a043f9..d3d4a63 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java
@@ -56,10 +56,12 @@ public class KuduTableSourceITCase extends KuduTestBase {
         tableEnv.sqlUpdate("DROP TABLE books");
     }
 
+
     @Test
     void testScanWithProjectionAndFilter() throws Exception {
         // (price > 30 and price < 40)
-        CloseableIterator<Row> it = tableEnv.executeSql("SELECT title FROM books WHERE id IN (1003, 1004) and quantity < 40").collect();
+        CloseableIterator<Row> it = tableEnv.executeSql("SELECT title FROM books WHERE id IN (1003, 1004) and " +
+                "quantity < 40").collect();
         List<Row> results = new ArrayList<>();
         it.forEachRemaining(results::add);
         assertEquals(1, results.size());
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicSinkTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicSinkTest.java
new file mode 100644
index 0000000..8dd1a79
--- /dev/null
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicSinkTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.table.dynamic;
+
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.KuduTestBase;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * Unit Tests for {@link KuduDynamicTableSink}.
+ */
+public class KuduDynamicSinkTest extends KuduTestBase {
+    public static final String INPUT_TABLE = "books";
+    public static StreamExecutionEnvironment env;
+    public static TableEnvironment tEnv;
+
+    @BeforeEach
+    public void init() {
+        KuduTableInfo tableInfo = booksTableInfo(INPUT_TABLE, true);
+        setUpDatabase(tableInfo);
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        tEnv = StreamTableEnvironment.create(env);
+    }
+
+    @AfterEach
+    public void clean() {
+        KuduTableInfo tableInfo = booksTableInfo(INPUT_TABLE, true);
+        cleanDatabase(tableInfo);
+    }
+
+    @Test
+    public void testKuduSink() throws Exception {
+        String createSql = "CREATE TABLE "
+                + INPUT_TABLE
+                + "("
+                + "id int,"
+                + "title string,"
+                + "author string,"
+                + "price double,"
+                + "quantity int"
+                + ") WITH ("
+                + "  'connector'='kudu',"
+                + "  'kudu.masters'='"
+                + 123245
+                + "',"
+                + "  'kudu.table'='"
+                + INPUT_TABLE
+                + "','kudu.primary-key-columns'='id"
+                + "','kudu.max-buffer-size'='1024"
+                + "','kudu.flush-interval'='1000"
+                + "','kudu.operation-timeout'='500"
+                + "','kudu.ignore-not-found'='true"
+                + "','kudu.ignore-not-found'='true'"
+                + ")";
+        tEnv.executeSql(createSql);
+        tEnv.executeSql("insert into " + INPUT_TABLE + " values(1006,'test title','test author',10.1,10)");
+        CloseableIterator<Row> collected =
+                tEnv.executeSql("select * from " + INPUT_TABLE + " where id =1006").collect();
+        assertNotNull(collected);
+    }
+}
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicSourceTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicSourceTest.java
new file mode 100644
index 0000000..2cab282
--- /dev/null
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicSourceTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.table.dynamic;
+
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.KuduTestBase;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * Unit Tests for {@link KuduDynamicTableSource}.
+ */
+public class KuduDynamicSourceTest extends KuduTestBase {
+    public static final String INPUT_TABLE = "books";
+    public static StreamExecutionEnvironment env;
+    public static TableEnvironment tEnv;
+
+    @BeforeEach
+    public void init() {
+        KuduTableInfo tableInfo = booksTableInfo(INPUT_TABLE, true);
+        setUpDatabase(tableInfo);
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        tEnv = StreamTableEnvironment.create(env);
+    }
+
+    @AfterEach
+    public void clean() {
+        KuduTableInfo tableInfo = booksTableInfo(INPUT_TABLE, true);
+        cleanDatabase(tableInfo);
+    }
+
+    @Test
+    public void testKuduSource() throws Exception {
+        // "id", "title", "author", "price", "quantity"
+        tEnv.executeSql(
+                "CREATE TABLE "
+                        + INPUT_TABLE
+                        + "("
+                        + "id int,"
+                        + "title string,"
+                        + "author string,"
+                        + "price double,"
+                        + "quantity int"
+                        + ") WITH ("
+                        + "  'connector'='kudu',"
+                        + "  'kudu.masters'='"
+                        + getMasterAddress()
+                        + "',"
+                        + "  'kudu.table'='"
+                        + INPUT_TABLE
+                        + "',"
+                        + "'kudu.scan.row-size'='10'," +
+                        "'kudu.primary-key-columns'='id'"
+                        + ")");
+
+        Iterator<Row> collected = tEnv.executeSql("SELECT * FROM " + INPUT_TABLE).collect();
+        assertNotNull(collected);
+    }
+
+    @Test
+    public void testProject() throws Exception {
+        tEnv.executeSql(
+                "CREATE TABLE "
+                        + INPUT_TABLE
+                        + "("
+                        + "id int,"
+                        + "title string,"
+                        + "author string,"
+                        + "price double,"
+                        + "quantity int"
+                        + ") WITH ("
+                        + "  'connector'='kudu',"
+                        + "  'kudu.masters'='"
+                        + getMasterAddress()
+                        + "',"
+                        + "  'kudu.table'='"
+                        + INPUT_TABLE
+                        + "',"
+                        + "'kudu.scan.row-size'='10'," +
+                        "'kudu.primary-key-columns'='id'"
+                        + ")");
+
+        Iterator<Row> collected =
+                tEnv.executeSql("SELECT id,title,author FROM " + INPUT_TABLE)
+                        .collect();
+        assertNotNull(collected);
+        List<String> result =
+                CollectionUtil.iteratorToList(collected).stream()
+                        .map(Row::toString)
+                        .sorted()
+                        .collect(Collectors.toList());
+        List<String> expected =
+                Stream.of(
+                                "+I[1001, Java for dummies, Tan Ah Teck]",
+                                "+I[1002, More Java for dummies, Tan Ah Teck]",
+                                "+I[1003, More Java for more dummies, Mohammad Ali]",
+                                "+I[1004, A Cup of Java, Kumar]",
+                                "+I[1005, A Teaspoon of Java, Kevin Jones]")
+                        .sorted()
+                        .collect(Collectors.toList());
+        assertEquals(expected, result);
+    }
+
+    @Test
+    public void testLimit() throws Exception {
+        tEnv.executeSql(
+                "CREATE TABLE "
+                        + INPUT_TABLE
+                        + "("
+                        + "id int,"
+                        + "title string,"
+                        + "author string,"
+                        + "price double,"
+                        + "quantity int"
+                        + ") WITH ("
+                        + "  'connector'='kudu',"
+                        + "  'kudu.masters'='"
+                        + getMasterAddress()
+                        + "',"
+                        + "  'kudu.table'='"
+                        + INPUT_TABLE
+                        + "',"
+                        + "'kudu.scan.row-size'='10'," +
+                        "'kudu.primary-key-columns'='id'"
+                        + ")");
+
+        Iterator<Row> collected =
+                tEnv.executeSql("SELECT * FROM " + INPUT_TABLE + " LIMIT 1").collect();
+        List<String> result =
+                CollectionUtil.iteratorToList(collected).stream()
+                        .map(Row::toString)
+                        .sorted()
+                        .collect(Collectors.toList());
+        assertEquals(1, result.size());
+    }
+}
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/dynamic/KuduRowDataLookupFunctionTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/dynamic/KuduRowDataLookupFunctionTest.java
new file mode 100644
index 0000000..bc3d51a
--- /dev/null
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/dynamic/KuduRowDataLookupFunctionTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.table.dynamic;
+
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.KuduTestBase;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
+import org.apache.flink.connectors.kudu.table.function.lookup.KuduLookupOptions;
+import org.apache.flink.connectors.kudu.table.function.lookup.KuduRowDataLookupFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Collector;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+
+/**
+ * Unit Tests for {@link KuduRowDataLookupFunction}.
+ */
+public class KuduRowDataLookupFunctionTest extends KuduTestBase {
+    public static final String INPUT_TABLE = "books";
+    public static KuduTableInfo tableInfo;
+
+    @BeforeEach
+    public void init() {
+        tableInfo = booksTableInfo(INPUT_TABLE, true);
+        setUpDatabase(tableInfo);
+    }
+
+    @AfterEach
+    public void clean() {
+        KuduTableInfo tableInfo = booksTableInfo(INPUT_TABLE, true);
+        cleanDatabase(tableInfo);
+    }
+
+    @Test
+    public void testEval() throws Exception {
+        KuduLookupOptions lookupOptions = KuduLookupOptions.builder().build();
+
+        KuduRowDataLookupFunction lookupFunction = buildRowDataLookupFunction(lookupOptions, new String[]{
+                "id"});
+
+        ListOutputCollector collector = new ListOutputCollector();
+        lookupFunction.setCollector(collector);
+
+        lookupFunction.open(null);
+
+        lookupFunction.eval(1001);
+
+        lookupFunction.eval(1002);
+
+        lookupFunction.eval(1003);
+
+        List<String> result =
+                new ArrayList<>(collector.getOutputs())
+                        .stream().map(RowData::toString).sorted().collect(Collectors.toList());
+
+        assertNotNull(result);
+    }
+
+    @Test
+    public void testCacheEval() throws Exception {
+        KuduLookupOptions lookupOptions = KuduLookupOptions.builder()
+                .withCacheMaxSize(1024)
+                .withMaxRetryTimes(3)
+                .withCacheExpireMs(10)
+                .build();
+
+        KuduRowDataLookupFunction lookupFunction = buildRowDataLookupFunction(lookupOptions, new String[]{
+                "id"});
+
+        ListOutputCollector collector = new ListOutputCollector();
+        lookupFunction.setCollector(collector);
+
+        lookupFunction.open(null);
+
+        lookupFunction.eval(1001);
+
+        lookupFunction.eval(1002);
+
+        lookupFunction.eval(1003);
+
+        List<String> result =
+                new ArrayList<>(collector.getOutputs())
+                        .stream().map(RowData::toString).sorted().collect(Collectors.toList());
+
+        assertNotNull(result);
+    }
+
+    private KuduRowDataLookupFunction buildRowDataLookupFunction(KuduLookupOptions lookupOptions, String[] keyNames) {
+        KuduReaderConfig config = KuduReaderConfig.Builder.setMasters(getMasterAddress())
+                .setRowLimit(10)
+                .build();
+        return new KuduRowDataLookupFunction.Builder()
+                .kuduReaderConfig(config)
+                .kuduLookupOptions(lookupOptions)
+                .keyNames(keyNames)
+                .projectedFields(getFieldNames())
+                .tableInfo(tableInfo)
+                .build();
+
+    }
+
+    private String[] getFieldNames() {
+        return new String[]{
+                "id", "title", "author", "price", "quantity"
+        };
+    }
+
+    /**
+     * ouput collector
+     */
+    private static final class ListOutputCollector implements Collector<RowData> {
+
+        private final List<RowData> output = new ArrayList<>();
+
+        @Override
+        public void collect(RowData row) {
+            this.output.add(row);
+        }
+
+        @Override
+        public void close() {
+        }
+
+        public List<RowData> getOutputs() {
+            return output;
+        }
+    }
+}
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/RowDataUpsertOperationMapperTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/RowDataUpsertOperationMapperTest.java
new file mode 100644
index 0000000..3cda636
--- /dev/null
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/RowDataUpsertOperationMapperTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.writer;
+
+import org.apache.flink.connectors.kudu.connector.KuduTestBase;
+import org.apache.flink.connectors.kudu.connector.writer.RowDataUpsertOperationMapper;
+import org.apache.flink.table.data.RowData;
+import org.apache.kudu.client.Operation;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Unit Tests for {@link RowDataUpsertOperationMapper}.
+ */
+public class RowDataUpsertOperationMapperTest extends AbstractOperationTest {
+
+    @Test
+    void testGetField() {
+        RowDataUpsertOperationMapper mapper =
+                new RowDataUpsertOperationMapper(KuduTestBase.booksTableSchema());
+        RowData inputRow = KuduTestBase.booksRowData().get(0);
+
+        Assertions.assertEquals(inputRow.getInt(0), mapper.getField(inputRow, 0));
+    }
+
+
+    @Test
+    void testCorrectOperationUpsert() {
+        RowDataUpsertOperationMapper mapper =
+                new RowDataUpsertOperationMapper(KuduTestBase.booksTableSchema());
+        RowData inputRow = KuduTestBase.booksRowData().get(0);
+
+        List<Operation> operations = mapper.createOperations(inputRow, mockTable);
+
+        assertEquals(1, operations.size());
+        verify(mockTable).newUpsert();
+    }
+}