You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/06/22 06:19:33 UTC

[rocketmq-connect] branch master updated: [ISSUE #153]upgrade rocketmq connect JDBC plug-in (#154)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 8074d49  [ISSUE #153]upgrade rocketmq connect JDBC plug-in (#154)
8074d49 is described below

commit 8074d49d60be757558f15a367047f48a0ea4e395
Author: xiaoyi <su...@163.com>
AuthorDate: Wed Jun 22 14:19:28 2022 +0800

    [ISSUE #153]upgrade rocketmq connect JDBC plug-in (#154)
    
    * upgrade rocketmq connect JDBC plug-in #153
    
    * upgrade rocketmq jdbc plugin and runtime api to 0.1.3 #153
    
    * fixed
    
    * add Decimal logical
    
    * optimize transform api
    
    * upgrade api to 0.1.3
    
    * fixed
    
    * fixed
    
    * update api version 0.1.3
    
    * fixed
    
    Co-authored-by: “sunxiaojian” <“sunxiaojian926@163.com”>
---
 connectors/rocketmq-connect-jdbc/pom.xml           | 267 +++++++++++----------
 .../connect/jdbc/config/AbstractConfig.java        |   2 +-
 .../connect/jdbc/connector/JdbcSinkConnector.java  |  28 +--
 .../connect/jdbc/connector/JdbcSinkTask.java       |  30 +--
 .../connect/jdbc/connector/JdbcSourceConfig.java   |   4 +-
 .../jdbc/connector/JdbcSourceConnector.java        |  23 +-
 .../connect/jdbc/connector/JdbcSourceTask.java     |  55 ++---
 .../connect/jdbc/dialect/DatabaseDialect.java      |  18 +-
 .../jdbc/dialect/DatabaseDialectFactory.java       |   1 +
 .../jdbc/dialect/PreparedStatementBinder.java      |  13 +-
 .../jdbc/dialect/impl/GenericDatabaseDialect.java  | 114 ++++-----
 .../jdbc/dialect/impl/MySqlDatabaseDialect.java    |  10 +-
 .../jdbc/dialect/impl/OpenMLDBDatabaseDialect.java | 107 ++++++++-
 .../connect/jdbc/sink/BufferedRecords.java         |   7 +-
 .../jdbc/source/TimestampIncrementingCriteria.java |  13 +-
 .../jdbc/source/metadata/SchemaMapping.java        |  17 +-
 .../connect/jdbc/source/querier/BulkQuerier.java   |   3 +-
 .../connect/jdbc/source/querier/Querier.java       |   2 +-
 .../querier/TimestampIncrementingQuerier.java      |  11 +-
 .../rocketmq/connect/jdbc/util/BytesUtil.java      |   6 +-
 .../rocketmq/connect/jdbc/util/NumericMapping.java |   6 +-
 .../connect/jdbc/connector/sink/JdbcSinkTest.java  |  18 ++
 .../jdbc/connector/sink/OpenMLDBJdbcSinkTest.java  |  23 +-
 pom.xml                                            |   2 +-
 .../runtime/connectorwrapper/TransformChain.java   |   3 +-
 .../connect/runtime/connectorwrapper/Worker.java   |   3 +-
 .../runtime/connectorwrapper/WorkerConnector.java  |   4 +-
 .../runtime/connectorwrapper/WorkerDirectTask.java |  31 ++-
 .../runtime/connectorwrapper/WorkerSinkTask.java   |   4 +-
 .../connectorwrapper/WorkerSinkTaskContext.java    |  11 +
 .../runtime/connectorwrapper/WorkerSourceTask.java |  17 +-
 .../service/ConfigManagementServiceImpl.java       |   2 +-
 .../memory/MemoryConfigManagementServiceImpl.java  |   2 +-
 .../connectorwrapper/testimpl/TestConnector.java   |  23 +-
 .../connectorwrapper/testimpl/TestSourceTask.java  |  15 +-
 .../connectorwrapper/testimpl/TestTask.java        |  18 +-
 .../rocketmq/connect/file/FileSinkConnector.java   |  17 +-
 .../apache/rocketmq/connect/file/FileSinkTask.java |  28 +--
 .../rocketmq/connect/file/FileSourceConnector.java |  11 +-
 .../rocketmq/connect/file/FileSourceTask.java      |  24 +-
 .../rocketmq/connect/file/FilterTransform.java     |  23 +-
 41 files changed, 537 insertions(+), 479 deletions(-)

diff --git a/connectors/rocketmq-connect-jdbc/pom.xml b/connectors/rocketmq-connect-jdbc/pom.xml
index eaf4336..59d7f4b 100644
--- a/connectors/rocketmq-connect-jdbc/pom.xml
+++ b/connectors/rocketmq-connect-jdbc/pom.xml
@@ -16,9 +16,10 @@
     <modelVersion>4.0.0</modelVersion>
     <groupId>org.apache.rocketmq</groupId>
     <artifactId>rocketmq-connect-jdbc</artifactId>
+    <packaging>jar</packaging>
     <version>0.0.1-SNAPSHOT</version>
 
-    <name>connect-jdbc</name>
+    <name>rocketmq-connect-jdbc</name>
     <url>https://github.com/apache/incubator-rocketmq-externals/tree/master/rocketmq-connect-jdbc</url>
 
     <licenses>
@@ -36,21 +37,144 @@
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
-
         <!-- Compiler settings properties -->
         <maven.compiler.source>1.8</maven.compiler.source>
         <maven.compiler.target>1.8</maven.compiler.target>
         <rocketmq.version>4.5.2</rocketmq.version>
+
+        <!--test jar-->
+        <junit.version>4.13.1</junit.version>
+        <assertj.version>2.6.0</assertj.version>
+        <mockito.version>2.6.3</mockito.version>
+
+        <!--rocket connect api-->
+        <openmessaging-connector.version>0.1.3</openmessaging-connector.version>
+        <openmessaging-api.version>0.3.1-alpha</openmessaging-api.version>
+
+        <!--fast json-->
+        <fastjson.version>1.2.83</fastjson.version>
+
     </properties>
 
 
+    <dependencies>
+
+        <!---->
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-client</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-tools</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-remoting</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-openmessaging</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+
+        <!--rocketmq connect api-->
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-connector</artifactId>
+            <version>${openmessaging-connector.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-api</artifactId>
+            <version>${openmessaging-api.version}</version>
+        </dependency>
+
+        <!--junit -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>${junit.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>${assertj.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>${mockito.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!--fast json version-->
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>${fastjson.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.7</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>1.2.0</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+            <version>1.2.9</version>
+        </dependency>
+
+
+        <dependency>
+            <groupId>commons-codec</groupId>
+            <artifactId>commons-codec</artifactId>
+            <version>1.12</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-cli</groupId>
+            <artifactId>commons-cli</artifactId>
+            <version>1.2</version>
+        </dependency>
+
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <version>8.0.16</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.4paradigm.openmldb</groupId>
+            <artifactId>openmldb-native</artifactId>
+            <version>0.5.0-macos</version>
+        </dependency>
+        <dependency>
+            <groupId>com.4paradigm.openmldb</groupId>
+            <artifactId>openmldb-jdbc</artifactId>
+            <version>0.5.0</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.4paradigm.openmldb</groupId>
+                    <artifactId>openmldb-native</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+
+
     <build>
-        <resources>
-            <resource>
-                <directory>src/main/resources</directory>
-                <filtering>true</filtering>
-            </resource>
-        </resources>
         <plugins>
             <plugin>
                 <groupId>org.codehaus.mojo</groupId>
@@ -147,6 +271,11 @@
                 <artifactId>maven-assembly-plugin</artifactId>
                 <version>3.0.0</version>
                 <configuration>
+                    <archive>
+                        <manifest>
+                            <mainClass>org.apache.rocketmq.connect.redis.connector.RedisSourceConnector</mainClass>
+                        </manifest>
+                    </archive>
                     <descriptorRefs>
                         <descriptorRef>jar-with-dependencies</descriptorRef>
                     </descriptorRefs>
@@ -184,126 +313,4 @@
             </plugin>
         </plugins>
     </build>
-
-    <dependencies>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <version>4.13.1</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.assertj</groupId>
-            <artifactId>assertj-core</artifactId>
-            <version>2.6.0</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.mockito</groupId>
-            <artifactId>mockito-core</artifactId>
-            <version>2.6.3</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>commons-codec</groupId>
-            <artifactId>commons-codec</artifactId>
-            <version>1.12</version>
-        </dependency>
-        <dependency>
-            <groupId>io.openmessaging</groupId>
-            <artifactId>openmessaging-connector</artifactId>
-            <version>0.1.2</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>io.openmessaging</groupId>
-            <artifactId>openmessaging-api</artifactId>
-            <version>0.3.1-alpha</version>
-        </dependency>
-        <dependency>
-            <groupId>com.alibaba</groupId>
-            <artifactId>fastjson</artifactId>
-            <version>1.2.83</version>
-        </dependency>
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-api</artifactId>
-            <version>1.7.7</version>
-        </dependency>
-        <dependency>
-            <groupId>ch.qos.logback</groupId>
-            <artifactId>logback-classic</artifactId>
-            <version>1.2.0</version>
-        </dependency>
-        <dependency>
-            <groupId>ch.qos.logback</groupId>
-            <artifactId>logback-core</artifactId>
-            <version>1.2.9</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.rocketmq</groupId>
-            <artifactId>rocketmq-client</artifactId>
-            <version>${rocketmq.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.rocketmq</groupId>
-            <artifactId>rocketmq-tools</artifactId>
-            <version>${rocketmq.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.rocketmq</groupId>
-            <artifactId>rocketmq-remoting</artifactId>
-            <version>${rocketmq.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.rocketmq</groupId>
-            <artifactId>rocketmq-openmessaging</artifactId>
-            <version>4.3.2</version>
-        </dependency>
-
-        <dependency>
-            <groupId>commons-cli</groupId>
-            <artifactId>commons-cli</artifactId>
-            <version>1.2</version>
-        </dependency>
-        <dependency>
-            <groupId>mysql</groupId>
-            <artifactId>mysql-connector-java</artifactId>
-            <version>8.0.28</version>
-        </dependency>
-        <dependency>
-            <groupId>io.javalin</groupId>
-            <artifactId>javalin</artifactId>
-            <version>1.3.0</version>
-        </dependency>
-
-        <dependency>
-            <groupId>com.github.shyiko</groupId>
-            <artifactId>mysql-binlog-connector-java</artifactId>
-            <version>0.20.1</version>
-        </dependency>
-
-        <dependency>
-            <groupId>com.alibaba</groupId>
-            <artifactId>druid</artifactId>
-            <version>1.1.22</version>
-        </dependency>
-
-        <dependency>
-            <groupId>com.4paradigm.openmldb</groupId>
-            <artifactId>openmldb-native</artifactId>
-            <version>0.5.0-macos</version>
-        </dependency>
-        <dependency>
-            <groupId>com.4paradigm.openmldb</groupId>
-            <artifactId>openmldb-jdbc</artifactId>
-            <version>0.5.0</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.4paradigm.openmldb</groupId>
-                    <artifactId>openmldb-native</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-    </dependencies>
 </project>
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/config/AbstractConfig.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/config/AbstractConfig.java
index e4fd758..9d53fdc 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/config/AbstractConfig.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/config/AbstractConfig.java
@@ -129,7 +129,7 @@ public abstract class AbstractConfig {
     }
 
     protected Boolean getBoolean(KeyValue config, String key, Boolean defaultValue) {
-        return config.containsKey(key) ? Boolean.getBoolean(config.getString(key)) : defaultValue;
+        return config.containsKey(key) ? Boolean.parseBoolean(config.getString(key)) : defaultValue;
     }
 
 }
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java
index aaf8560..4cfa328 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java
@@ -21,6 +21,7 @@ import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.connector.ConnectorContext;
 import io.openmessaging.connector.api.component.task.Task;
 import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,14 +32,13 @@ import java.util.List;
 /**
  * jdbc sink connector
  */
-public class JdbcSinkConnector extends SinkConnector {
+public class JdbcSinkConnector extends SinkConnector{
     private static final Logger log = LoggerFactory.getLogger(JdbcSinkConnector.class);
     private KeyValue connectConfig;
-    private ConnectorContext context;
 
     @Override
-    public void start(ConnectorContext context) {
-        this.context = context;
+    public void start(KeyValue config) {
+        this.connectConfig = config;
     }
 
     /**
@@ -52,26 +52,9 @@ public class JdbcSinkConnector extends SinkConnector {
         // do validate config
     }
 
-    /**
-     * Init the component
-     *
-     * @param config
-     */
-    @Override
-    public void init(KeyValue config) {
-        this.connectConfig = config;
-    }
-
     @Override
     public void stop() {
-    }
-
-    @Override
-    public void pause() {
-    }
-
-    @Override
-    public void resume() {
+        this.connectConfig = null;
     }
 
     /**
@@ -95,5 +78,4 @@ public class JdbcSinkConnector extends SinkConnector {
     public Class<? extends Task> taskClass() {
         return JdbcSinkTask.class;
     }
-
 }
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkTask.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkTask.java
index d69c80a..35656eb 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkTask.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkTask.java
@@ -69,7 +69,7 @@ public class JdbcSinkTask extends SinkTask {
             SQLException sqlAllMessagesException = getAllMessagesException(sqle);
             if (remainingRetries > 0) {
                 updater.closeQuietly();
-                init(originalConfig);
+                start(originalConfig);
                 remainingRetries--;
                 throw new RetriableException(sqlAllMessagesException);
             }
@@ -87,30 +87,13 @@ public class JdbcSinkTask extends SinkTask {
         return sqlAllMessagesException;
     }
 
-
-    @Override
-    public void start(SinkTaskContext context) {
-        this.context = context;
-    }
-
     /**
-     * Should invoke before start the connector.
-     *
-     * @param config
-     * @return error message
-     */
-    @Override
-    public void validate(KeyValue config) {
-        // to do nothing
-    }
-
-    /**
-     * Init the component
+     * Start the component
      *
      * @param keyValue
      */
     @Override
-    public void init(KeyValue keyValue) {
+    public void start(KeyValue keyValue) {
         originalConfig = keyValue;
         config = new JdbcSinkConfig(keyValue);
         remainingRetries = config.getMaxRetries();
@@ -142,12 +125,5 @@ public class JdbcSinkTask extends SinkTask {
         }
     }
 
-    @Override
-    public void pause() {
-    }
-
-    @Override
-    public void resume() {
-    }
 
 }
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConfig.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConfig.java
index 20f3aa7..99178bf 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConfig.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConfig.java
@@ -232,8 +232,8 @@ public class JdbcSourceConfig extends AbstractConfig {
         this.timestampColumnNames = getList(config, TIMESTAMP_COLUMN_NAME_CONFIG);
         timestampDelayIntervalMs = config.getLong(TIMESTAMP_DELAY_INTERVAL_MS_CONFIG);
 //        this.timestampInitial=config.getLong(TIMESTAMP_INITIAL_CONFIG,TIMESTAMP_INITIAL_DEFAULT);
-        if (config.containsKey(TIMESTAMP_INITIAL_CONFIG)){
-            this.timestampInitial=config.getLong(TIMESTAMP_INITIAL_CONFIG);
+        if (config.containsKey(TIMESTAMP_INITIAL_CONFIG)) {
+            this.timestampInitial = config.getLong(TIMESTAMP_INITIAL_CONFIG);
         }
         this.tableWhitelist = new HashSet<>(getList(config, TABLE_WHITELIST_CONFIG));
         this.tableBlacklist = new HashSet<>(getList(config, TABLE_BLACKLIST_CONFIG));
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
index c48da71..4ad9e4f 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
@@ -30,6 +30,9 @@ import org.slf4j.LoggerFactory;
 
 import io.openmessaging.KeyValue;
 
+/**
+ * jdbc source connector
+ */
 public class JdbcSourceConnector extends SourceConnector {
     private static final Logger log = LoggerFactory.getLogger(JdbcSourceConnector.class);
     private JdbcSourceConfig jdbcSourceConfig;
@@ -49,28 +52,20 @@ public class JdbcSourceConnector extends SourceConnector {
     }
 
     /**
-     * Init the component
+     * Start the component
      *
-     * @param config
+     * @param config component context
      */
     @Override
-    public void init(KeyValue config) {
-        if (config.containsKey("connect-topicname")) {
-            config.put("connect-topicname", "");
-        }
+    public void start(KeyValue config) {
         originalConfig = config;
     }
 
-    @Override
-    public void stop() {
-    }
-
-    @Override
-    public void pause() {
-    }
 
     @Override
-    public void resume() {
+    public void stop() {
+        this.originalConfig = null;
+        this.jdbcSourceConfig = null;
     }
 
     /**
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
index 80a7117..192f676 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.connect.jdbc.connector;
 
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.task.source.SourceTask;
-import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.connect.jdbc.dialect.DatabaseDialect;
@@ -157,13 +156,25 @@ public class JdbcSourceTask extends SourceTask {
 
     /**
      * start jdbc task
-     *
-     * @param context
      */
     @Override
-    public void start(SourceTaskContext context) {
+    public void start(KeyValue props) {
+        // init config
+        config = new JdbcSourceTaskConfig(props);
+        final String dialectName = config.getDialectName();
+        final String url = config.getConnectionDbUrl();
+        if (dialectName != null && !dialectName.trim().isEmpty()) {
+            dialect = DatabaseDialectFactory.create(dialectName, config);
+        } else {
+            dialect = DatabaseDialectFactory.findDialectFor(url, config);
+        }
+        final int maxConnAttempts = config.getAttempts();
+        final long retryBackoff = config.getBackoffMs();
+        cachedConnectionProvider = connectionProvider(maxConnAttempts, retryBackoff);
+        log.info("Using JDBC dialect {}", dialect.name());
+
         // compute table offset
-        Map<String, Map<String, Object>> offsetValues = SourceOffsetCompute.initOffset(config, context, dialect, cachedConnectionProvider);
+        Map<String, Map<String, Object>> offsetValues = SourceOffsetCompute.initOffset(config, sourceTaskContext, dialect, cachedConnectionProvider);
         for (String tableOrQuery : offsetValues.keySet()) {
             this.buildAndAddQuerier(
                     JdbcSourceConfig.TableLoadMode.findTableLoadModeByName(this.config.getMode()),
@@ -258,30 +269,6 @@ public class JdbcSourceTask extends SourceTask {
         }
     }
 
-    /**
-     * Init the component
-     *
-     * @param props
-     */
-    @Override
-    public void init(KeyValue props) {
-        try {
-            config = new JdbcSourceTaskConfig(props);
-            final String dialectName = config.getDialectName();
-            final String url = config.getConnectionDbUrl();
-            if (dialectName != null && !dialectName.trim().isEmpty()) {
-                dialect = DatabaseDialectFactory.create(dialectName, config);
-            } else {
-                dialect = DatabaseDialectFactory.findDialectFor(url, config);
-            }
-            final int maxConnAttempts = config.getAttempts();
-            final long retryBackoff = config.getBackoffMs();
-            cachedConnectionProvider = connectionProvider(maxConnAttempts, retryBackoff);
-            log.info("Using JDBC dialect {}", dialect.name());
-        } catch (Exception e) {
-            log.error("Cannot start Jdbc Source Task because of configuration error{}", e);
-        }
-    }
 
     protected CachedConnectionProvider connectionProvider(int maxConnAttempts, long retryBackoff) {
         return new CachedConnectionProvider(dialect, maxConnAttempts, retryBackoff) {
@@ -321,14 +308,4 @@ public class JdbcSourceTask extends SourceTask {
             }
         }
     }
-
-    @Override
-    public void pause() {
-        // do nothing
-    }
-
-    @Override
-    public void resume() {
-        // do nothing
-    }
 }
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/DatabaseDialect.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/DatabaseDialect.java
index 61c9d6e..14728af 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/DatabaseDialect.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/DatabaseDialect.java
@@ -18,17 +18,18 @@ package org.apache.rocketmq.connect.jdbc.dialect;
 
 import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
 import org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConfig;
+import org.apache.rocketmq.connect.jdbc.dialect.provider.ConnectionProvider;
 import org.apache.rocketmq.connect.jdbc.schema.column.ColumnDefinition;
 import org.apache.rocketmq.connect.jdbc.schema.column.ColumnId;
 import org.apache.rocketmq.connect.jdbc.schema.table.TableDefinition;
 import org.apache.rocketmq.connect.jdbc.schema.table.TableId;
-import org.apache.rocketmq.connect.jdbc.source.metadata.ColumnMapping;
 import org.apache.rocketmq.connect.jdbc.sink.metadata.FieldsMetadata;
 import org.apache.rocketmq.connect.jdbc.sink.metadata.SchemaPair;
 import org.apache.rocketmq.connect.jdbc.sink.metadata.SinkRecordField;
-import org.apache.rocketmq.connect.jdbc.dialect.provider.ConnectionProvider;
 import org.apache.rocketmq.connect.jdbc.source.TimestampIncrementingCriteria;
+import org.apache.rocketmq.connect.jdbc.source.metadata.ColumnMapping;
 import org.apache.rocketmq.connect.jdbc.util.ExpressionBuilder;
 import org.apache.rocketmq.connect.jdbc.util.IdentifierRules;
 
@@ -56,6 +57,15 @@ public interface DatabaseDialect extends ConnectionProvider {
      */
     String name();
 
+    /**
+     * get dialect class
+     *
+     * @return
+     */
+    default Class getDialectClass() {
+        return this.getClass();
+    }
+
     /**
      * create jdbc prepared statement
      *
@@ -149,11 +159,9 @@ public interface DatabaseDialect extends ConnectionProvider {
      * add field to schema
      *
      * @param column
-     * @param schema
-     * @param index
      * @return
      */
-    String addFieldToSchema(ColumnDefinition column, Schema schema, int index);
+    String addFieldToSchema(ColumnDefinition column, SchemaBuilder schemaBuilder);
 
     /**
      * Apply the supplied DDL statements using the given connection. This gives the dialect the
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/DatabaseDialectFactory.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/DatabaseDialectFactory.java
index 0938684..b93d580 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/DatabaseDialectFactory.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/DatabaseDialectFactory.java
@@ -137,6 +137,7 @@ public class DatabaseDialectFactory {
                 return provider.create(config);
             }
         }
+
         throw new ConnectException(
                 "Unable to find dialect with name '" + dialectName + "' in the available dialects: "
                         + dialectNames
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/PreparedStatementBinder.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/PreparedStatementBinder.java
index dc9c0ae..75155c4 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/PreparedStatementBinder.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/PreparedStatementBinder.java
@@ -18,9 +18,11 @@ package org.apache.rocketmq.connect.jdbc.dialect;
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.serializer.SerializerFeature;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.data.Field;
 import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.Struct;
 import io.openmessaging.connector.api.errors.ConnectException;
 import org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConfig;
 import org.apache.rocketmq.connect.jdbc.sink.metadata.FieldsMetadata;
@@ -98,10 +100,12 @@ public class PreparedStatementBinder implements DatabaseDialect.StatementBinder
                 }
                 break;
             case RECORD_VALUE: {
-                Object[] data = JSONArray.parseArray(JSON.toJSONString(record.getData())).stream().toArray();
+                String jsonData = JSON.toJSONString(record.getData(), SerializerFeature.DisableCircularReferenceDetect);
+                Struct struct = JSON.parseObject(jsonData, Struct.class);
+                struct.setValues(JSON.parseObject(jsonData).getJSONArray("values").toArray());
                 for (String fieldName : fieldsMetadata.keyFieldNames) {
                     final Field field = schemaPair.schema.getField(fieldName);
-                    bindField(index++, field.getSchema(), data[field.getIndex()], fieldName);
+                    bindField(index++, field.getSchema(), struct.get(fieldName), fieldName);
                 }
             }
             break;
@@ -115,9 +119,12 @@ public class PreparedStatementBinder implements DatabaseDialect.StatementBinder
             ConnectRecord record,
             int index
     ) throws SQLException {
+        String jsonData = JSON.toJSONString(record.getData(), SerializerFeature.DisableCircularReferenceDetect);
+        Struct struct = JSON.parseObject(jsonData, Struct.class);
+        struct.setValues(JSON.parseObject(jsonData).getJSONArray("values").toArray());
         for (final String fieldName : fieldsMetadata.nonKeyFieldNames) {
             final Field field = record.getSchema().getField(fieldName);
-            bindField(index++, field.getSchema(), ((Object[]) record.getData())[field.getIndex()], fieldName);
+            bindField(index++, field.getSchema(), struct.get(fieldName), fieldName);
         }
         return index;
     }
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/GenericDatabaseDialect.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/GenericDatabaseDialect.java
index d9e8dee..089d10d 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/GenericDatabaseDialect.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/GenericDatabaseDialect.java
@@ -16,11 +16,12 @@
  */
 package org.apache.rocketmq.connect.jdbc.dialect.impl;
 
-import io.openmessaging.connector.api.data.Field;
 import io.openmessaging.connector.api.data.FieldType;
 import io.openmessaging.connector.api.data.Schema;
 import io.openmessaging.connector.api.data.SchemaBuilder;
-import lombok.SneakyThrows;
+import io.openmessaging.connector.api.data.logical.Date;
+import io.openmessaging.connector.api.data.logical.Decimal;
+import io.openmessaging.connector.api.data.logical.Time;
 import org.apache.rocketmq.connect.jdbc.config.AbstractConfig;
 import org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConfig;
 import org.apache.rocketmq.connect.jdbc.connector.JdbcSourceConfig;
@@ -28,21 +29,18 @@ import org.apache.rocketmq.connect.jdbc.dialect.DatabaseDialect;
 import org.apache.rocketmq.connect.jdbc.dialect.DatabaseDialectFactory;
 import org.apache.rocketmq.connect.jdbc.dialect.DropOptions;
 import org.apache.rocketmq.connect.jdbc.dialect.PreparedStatementBinder;
+import org.apache.rocketmq.connect.jdbc.dialect.provider.DatabaseDialectProvider;
+import org.apache.rocketmq.connect.jdbc.dialect.provider.JdbcUrlInfo;
 import org.apache.rocketmq.connect.jdbc.schema.column.ColumnDefAdjuster;
 import org.apache.rocketmq.connect.jdbc.schema.column.ColumnDefinition;
 import org.apache.rocketmq.connect.jdbc.schema.column.ColumnId;
 import org.apache.rocketmq.connect.jdbc.schema.table.TableDefinition;
 import org.apache.rocketmq.connect.jdbc.schema.table.TableId;
-import org.apache.rocketmq.connect.jdbc.source.metadata.ColumnMapping;
 import org.apache.rocketmq.connect.jdbc.sink.metadata.FieldsMetadata;
 import org.apache.rocketmq.connect.jdbc.sink.metadata.SchemaPair;
 import org.apache.rocketmq.connect.jdbc.sink.metadata.SinkRecordField;
-import org.apache.rocketmq.connect.jdbc.dialect.provider.DatabaseDialectProvider;
-import org.apache.rocketmq.connect.jdbc.dialect.provider.JdbcUrlInfo;
-import org.apache.rocketmq.connect.jdbc.schema.column.parser.DateColumnParser;
-import org.apache.rocketmq.connect.jdbc.schema.column.parser.TimeColumnParser;
-import org.apache.rocketmq.connect.jdbc.schema.column.parser.TimestampColumnParser;
 import org.apache.rocketmq.connect.jdbc.source.TimestampIncrementingCriteria;
+import org.apache.rocketmq.connect.jdbc.source.metadata.ColumnMapping;
 import org.apache.rocketmq.connect.jdbc.util.DateTimeUtils;
 import org.apache.rocketmq.connect.jdbc.util.ExpressionBuilder;
 import org.apache.rocketmq.connect.jdbc.util.IdentifierRules;
@@ -54,6 +52,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.net.URL;
 import java.nio.ByteBuffer;
 import java.sql.Blob;
@@ -125,7 +124,7 @@ public class GenericDatabaseDialect implements DatabaseDialect {
 
     private static final Logger log = LoggerFactory.getLogger(GenericDatabaseDialect.class);
 
-//    @Deprecated
+    //    @Deprecated
 //    protected final Logger log = LoggerFactory.getLogger(GenericDatabaseDialect.class);
     protected AbstractConfig config;
     /**
@@ -190,7 +189,6 @@ public class GenericDatabaseDialect implements DatabaseDialect {
      * @return
      * @throws SQLException
      */
-    @SneakyThrows
     @Override
     public Connection getConnection() throws SQLException {
         // These config names are the same for both source and sink configs ...
@@ -922,10 +920,9 @@ public class GenericDatabaseDialect implements DatabaseDialect {
     @Override
     public String addFieldToSchema(
             ColumnDefinition columnDefn,
-            Schema schema,
-            int index
+            SchemaBuilder schemaBuilder
     ) {
-        return addFieldToSchema(columnDefn, schema, fieldNameFor(columnDefn), index, columnDefn.type(),
+        return addFieldToSchema(columnDefn, schemaBuilder, fieldNameFor(columnDefn), columnDefn.type(),
                 columnDefn.isOptional()
         );
     }
@@ -936,7 +933,6 @@ public class GenericDatabaseDialect implements DatabaseDialect {
      * @param columnDefn
      * @param builder
      * @param fieldName
-     * @param index
      * @param sqlType
      * @param optional
      * @return
@@ -944,9 +940,8 @@ public class GenericDatabaseDialect implements DatabaseDialect {
     @SuppressWarnings("fallthrough")
     protected String addFieldToSchema(
             final ColumnDefinition columnDefn,
-            final Schema builder,
+            final SchemaBuilder builder,
             final String fieldName,
-            final int index,
             final int sqlType,
             final boolean optional
     ) {
@@ -958,50 +953,50 @@ public class GenericDatabaseDialect implements DatabaseDialect {
                 return null;
             }
             case Types.BOOLEAN: {
-                builder.addField(new Field(index, fieldName, SchemaBuilder.bool().build()));
+                builder.field(fieldName, SchemaBuilder.bool().build());
                 break;
             }
 
             // ints <= 8 bits
             case Types.BIT: {
-                builder.addField(new Field(index, fieldName, SchemaBuilder.int8().build()));
+                builder.field(fieldName, SchemaBuilder.int8().build());
                 break;
             }
 
             case Types.TINYINT: {
                 if (columnDefn.isSignedNumber()) {
-                    builder.addField(new Field(index, fieldName, SchemaBuilder.int8().build()));
+                    builder.field(fieldName, SchemaBuilder.int8().build());
                 } else {
-                    builder.addField(new Field(index, fieldName, SchemaBuilder.int32().build()));
+                    builder.field(fieldName, SchemaBuilder.int32().build());
                 }
                 break;
             }
 
             // 16 bit ints
             case Types.SMALLINT: {
-                builder.addField(new Field(index, fieldName, SchemaBuilder.int32().build()));
+                builder.field(fieldName, SchemaBuilder.int32().build());
                 break;
             }
 
             // 32 bit ints
             case Types.INTEGER: {
                 if (columnDefn.isSignedNumber()) {
-                    builder.addField(new Field(index, fieldName, SchemaBuilder.int32().build()));
+                    builder.field(fieldName, SchemaBuilder.int32().build());
                 } else {
-                    builder.addField(new Field(index, fieldName, SchemaBuilder.int64().build()));
+                    builder.field(fieldName, SchemaBuilder.int64().build());
                 }
                 break;
             }
 
             // 64 bit ints
             case Types.BIGINT: {
-                builder.addField(new Field(index, fieldName, SchemaBuilder.int64().build()));
+                builder.field(fieldName, SchemaBuilder.int64().build());
                 break;
             }
 
             // REAL is a single precision floating point value, i.e. a Java float
             case Types.REAL: {
-                builder.addField(new Field(index, fieldName, SchemaBuilder.float32().build()));
+                builder.field(fieldName, SchemaBuilder.float32().build());
                 break;
             }
 
@@ -1009,8 +1004,15 @@ public class GenericDatabaseDialect implements DatabaseDialect {
             // for single precision
             case Types.FLOAT:
             case Types.DOUBLE:
+                builder.field(fieldName, SchemaBuilder.float64().build());
+                break;
             case Types.DECIMAL:
-                builder.addField(new Field(index, fieldName, SchemaBuilder.float64().build()));
+                scale = decimalScale(columnDefn);
+                SchemaBuilder fieldBuilder = Decimal.builder(scale);
+                if (optional) {
+                    fieldBuilder.optional();
+                }
+                builder.field(fieldName, fieldBuilder.build());
                 break;
 
             /**
@@ -1020,17 +1022,17 @@ public class GenericDatabaseDialect implements DatabaseDialect {
                 if (mapNumerics == NumericMapping.PRECISION_ONLY) {
                     log.debug("NUMERIC with precision: '{}' and scale: '{}'", precision, scale);
                     if (scale == 0 && precision <= MAX_INTEGER_TYPE_PRECISION) { // integer
-                        builder.addField(new Field(index, fieldName, integerSchema(optional, precision)));
+                        builder.field(fieldName, integerSchema(optional, precision));
                         break;
                     }
                 } else if (mapNumerics == NumericMapping.BEST_FIT) {
                     log.debug("NUMERIC with precision: '{}' and scale: '{}'", precision, scale);
                     if (precision <= MAX_INTEGER_TYPE_PRECISION) { // fits in primitive data types.
                         if (scale < 1 && scale >= NUMERIC_TYPE_SCALE_LOW) { // integer
-                            builder.addField(new Field(index, fieldName, integerSchema(optional, precision)));
+                            builder.field(fieldName, integerSchema(optional, precision));
                             break;
                         } else if (scale > 0) { // floating point - use double in all cases
-                            builder.addField(new Field(index, fieldName, SchemaBuilder.float64().build()));
+                            builder.field(fieldName, SchemaBuilder.float64().build());
                             break;
                         }
                     }
@@ -1038,11 +1040,11 @@ public class GenericDatabaseDialect implements DatabaseDialect {
                     log.debug("NUMERIC with precision: '{}' and scale: '{}'", precision, scale);
                     if (scale < 1 && scale >= NUMERIC_TYPE_SCALE_LOW) { // integer
                         if (precision <= MAX_INTEGER_TYPE_PRECISION) { // fits in primitive data types.
-                            builder.addField(new Field(index, fieldName, integerSchema(optional, precision)));
+                            builder.field(fieldName, integerSchema(optional, precision));
                             break;
                         }
                     } else if (scale > 0) { // floating point - use double in all cases
-                        builder.addField(new Field(index, fieldName, SchemaBuilder.float64().build()));
+                        builder.field(fieldName, SchemaBuilder.float64().build());
                         break;
                     }
                 }
@@ -1059,7 +1061,7 @@ public class GenericDatabaseDialect implements DatabaseDialect {
             case Types.SQLXML: {
                 // Some of these types will have fixed size, but we drop this from the schema conversion
                 // since only fixed byte arrays can have a fixed size
-                builder.addField(new Field(index, fieldName, SchemaBuilder.string().build()));
+                builder.field(fieldName, SchemaBuilder.string().build());
                 break;
             }
 
@@ -1069,28 +1071,28 @@ public class GenericDatabaseDialect implements DatabaseDialect {
             case Types.BLOB:
             case Types.VARBINARY:
             case Types.LONGVARBINARY: {
-                builder.addField(new Field(index, fieldName, SchemaBuilder.bytes().build()));
+                builder.field(fieldName, SchemaBuilder.bytes().build());
                 break;
             }
 
             // Date is day + moth + year
             case Types.DATE: {
-                SchemaBuilder dateSchemaBuilder = DateColumnParser.builder();
-                builder.addField(new Field(index, fieldName, dateSchemaBuilder.build()));
+                SchemaBuilder dateSchemaBuilder = Date.builder();
+                builder.field(fieldName, dateSchemaBuilder.build());
                 break;
             }
 
             // Time is a time of day -- hour, minute, seconds, nanoseconds
             case Types.TIME: {
-                SchemaBuilder timeSchemaBuilder = TimestampColumnParser.builder();
-                builder.addField(new Field(index, fieldName, timeSchemaBuilder.build()));
+                SchemaBuilder timeSchemaBuilder = Time.builder();
+                builder.field(fieldName, timeSchemaBuilder.build());
                 break;
             }
 
             // Timestamp is a date + time
             case Types.TIMESTAMP: {
-                SchemaBuilder tsSchemaBuilder = TimestampColumnParser.builder();
-                builder.addField(new Field(index, fieldName, tsSchemaBuilder.build()));
+                SchemaBuilder tsSchemaBuilder = io.openmessaging.connector.api.data.logical.Timestamp.builder();
+                builder.field(fieldName, tsSchemaBuilder.build());
                 break;
             }
 
@@ -1486,7 +1488,7 @@ public class GenericDatabaseDialect implements DatabaseDialect {
     }
 
     @Override
-    public final String buildDeleteStatement(
+    public String buildDeleteStatement(
             TableId table,
             Collection<ColumnId> keyColumns
     ) {
@@ -1552,7 +1554,7 @@ public class GenericDatabaseDialect implements DatabaseDialect {
                 statement.setObject(index, null);
             }
         } else {
-            boolean bound = maybeBindLogical(statement, index, schema, value, null);
+            boolean bound = maybeBindLogical(statement, index, schema, value);
             if (!bound) {
                 bound = maybeBindPrimitive(statement, index, schema, value);
             }
@@ -1566,12 +1568,14 @@ public class GenericDatabaseDialect implements DatabaseDialect {
             PreparedStatement statement,
             int index,
             Schema schema,
-            Object value,
-            ColumnDefinition colDef
+            Object value
     ) throws SQLException {
         if (schema.getName() != null) {
             switch (schema.getName()) {
-                case DateColumnParser.LOGICAL_NAME:
+                case Decimal.LOGICAL_NAME:
+                    statement.setBigDecimal(index, (BigDecimal) value);
+                    return true;
+                case Date.LOGICAL_NAME:
                     java.sql.Date date;
                     if (value instanceof java.util.Date) {
                         date = new java.sql.Date(((java.util.Date) value).getTime());
@@ -1583,7 +1587,7 @@ public class GenericDatabaseDialect implements DatabaseDialect {
                             DateTimeUtils.getTimeZoneCalendar(timeZone)
                     );
                     return true;
-                case TimeColumnParser.LOGICAL_NAME:
+                case Time.LOGICAL_NAME:
                     java.sql.Time time;
                     if (value instanceof java.util.Date) {
                         time = new java.sql.Time(((java.util.Date) value).getTime());
@@ -1595,12 +1599,12 @@ public class GenericDatabaseDialect implements DatabaseDialect {
                             DateTimeUtils.getTimeZoneCalendar(timeZone)
                     );
                     return true;
-                case TimestampColumnParser.LOGICAL_NAME:
-                    java.sql.Timestamp timestamp;
+                case io.openmessaging.connector.api.data.logical.Timestamp.LOGICAL_NAME:
+                    Timestamp timestamp;
                     if (value instanceof java.util.Date) {
-                        timestamp = new java.sql.Timestamp(((java.util.Date) value).getTime());
+                        timestamp = new Timestamp(((java.util.Date) value).getTime());
                     } else {
-                        timestamp = new java.sql.Timestamp((long) value);
+                        timestamp = new Timestamp((long) value);
                     }
                     statement.setTimestamp(
                             index, timestamp,
@@ -1674,22 +1678,22 @@ public class GenericDatabaseDialect implements DatabaseDialect {
     ) throws SQLException {
         switch (schema.getFieldType()) {
             case INT8:
-                statement.setByte(index, (Byte) value);
+                statement.setByte(index, Byte.parseByte(value.toString()));
                 break;
             case INT32:
-                statement.setInt(index, (Integer) value);
+                statement.setInt(index, Integer.parseInt(value.toString()));
                 break;
             case INT64:
-                statement.setLong(index, (Long) value);
+                statement.setLong(index, Long.parseLong(value.toString()));
                 break;
             case FLOAT32:
-                statement.setFloat(index, (Float) value);
+                statement.setFloat(index, Float.parseFloat(value.toString()));
                 break;
             case FLOAT64:
-                statement.setDouble(index, (Double) value);
+                statement.setDouble(index, Double.parseDouble(value.toString()));
                 break;
             case BOOLEAN:
-                statement.setBoolean(index, (Boolean) value);
+                statement.setBoolean(index, Boolean.parseBoolean(value.toString()));
                 break;
             case STRING:
                 statement.setString(index, (String) value);
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/MySqlDatabaseDialect.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/MySqlDatabaseDialect.java
index 0eb8724..ceb3fd0 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/MySqlDatabaseDialect.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/MySqlDatabaseDialect.java
@@ -16,15 +16,14 @@
  */
 package org.apache.rocketmq.connect.jdbc.dialect.impl;
 
-import lombok.SneakyThrows;
 import org.apache.rocketmq.connect.jdbc.config.AbstractConfig;
 import org.apache.rocketmq.connect.jdbc.dialect.DatabaseDialect;
-import org.apache.rocketmq.connect.jdbc.sink.metadata.SinkRecordField;
 import org.apache.rocketmq.connect.jdbc.dialect.provider.DatabaseDialectProvider;
 import org.apache.rocketmq.connect.jdbc.schema.column.ColumnId;
+import org.apache.rocketmq.connect.jdbc.schema.table.TableId;
+import org.apache.rocketmq.connect.jdbc.sink.metadata.SinkRecordField;
 import org.apache.rocketmq.connect.jdbc.util.ExpressionBuilder;
 import org.apache.rocketmq.connect.jdbc.util.IdentifierRules;
-import org.apache.rocketmq.connect.jdbc.schema.table.TableId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,14 +37,13 @@ import java.util.Collection;
  */
 public class MySqlDatabaseDialect extends GenericDatabaseDialect {
 
-    private final Logger log = LoggerFactory.getLogger(MySqlDatabaseDialect.class);
+    private final static Logger log = LoggerFactory.getLogger(MySqlDatabaseDialect.class);
 
     /**
      * The provider for {@link MySqlDatabaseDialect}.
      */
     public static class Provider extends DatabaseDialectProvider {
-        @SneakyThrows
-        public Provider() {
+        public Provider() throws ClassNotFoundException {
             super(MySqlDatabaseDialect.class.getSimpleName(), "mysql");
             Class.forName("com.mysql.cj.jdbc.Driver");
         }
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/OpenMLDBDatabaseDialect.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/OpenMLDBDatabaseDialect.java
index 0c70626..63198b2 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/OpenMLDBDatabaseDialect.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/OpenMLDBDatabaseDialect.java
@@ -16,14 +16,25 @@
  */
 package org.apache.rocketmq.connect.jdbc.dialect.impl;
 
-import lombok.SneakyThrows;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.logical.Date;
+import io.openmessaging.connector.api.data.logical.Time;
+import io.openmessaging.connector.api.data.logical.Timestamp;
 import org.apache.rocketmq.connect.jdbc.config.AbstractConfig;
 import org.apache.rocketmq.connect.jdbc.dialect.DatabaseDialect;
+import org.apache.rocketmq.connect.jdbc.dialect.DropOptions;
 import org.apache.rocketmq.connect.jdbc.dialect.provider.DatabaseDialectProvider;
+import org.apache.rocketmq.connect.jdbc.schema.column.ColumnId;
+import org.apache.rocketmq.connect.jdbc.schema.table.TableId;
+import org.apache.rocketmq.connect.jdbc.sink.metadata.SinkRecordField;
+import org.apache.rocketmq.connect.jdbc.util.ExpressionBuilder;
 import org.apache.rocketmq.connect.jdbc.util.IdentifierRules;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collection;
+import java.util.List;
+
 
 /**
  * openmldb database dialect
@@ -36,8 +47,7 @@ public class OpenMLDBDatabaseDialect extends GenericDatabaseDialect {
      * The provider for {@link OpenMLDBDatabaseDialect}.
      */
     public static class Provider extends DatabaseDialectProvider {
-        @SneakyThrows
-        public Provider() {
+        public Provider() throws ClassNotFoundException {
             super(OpenMLDBDatabaseDialect.class.getSimpleName(), "openmldb");
             Class.forName("com._4paradigm.openmldb.jdbc.SQLDriver");
         }
@@ -58,6 +68,97 @@ public class OpenMLDBDatabaseDialect extends GenericDatabaseDialect {
     }
 
 
+    @Override
+    protected String currentTimestampDatabaseQuery() {
+        return null;
+    }
+
+    @Override
+    protected String getSqlType(SinkRecordField field) {
+        if (field.schemaName() != null) {
+            String schema = field.schemaName();
+            switch (schema) {
+                case Timestamp.LOGICAL_NAME:
+                    return "TIMESTAMP";
+
+                case Date.LOGICAL_NAME:
+                case Time.LOGICAL_NAME:
+                    return "DATE";
+            }
+        }
+
+        switch (field.schemaType()) {
+            case INT32:
+                return "INT";
+            case INT64:
+                return "BIGINT";
+            case FLOAT32:
+                return "FLOAT";
+            case FLOAT64:
+                return "DOUBLE";
+            case BOOLEAN:
+                return "BOOL";
+            case STRING:
+                return "VARCHAR";
+            default:
+                return super.getSqlType(field);
+        }
+    }
+
+    @Override
+    public String buildCreateTableStatement(TableId table, Collection<SinkRecordField> fields) {
+        List<String> pkFieldNames = this.extractPrimaryKeyFieldNames(fields);
+        if (!pkFieldNames.isEmpty()) {
+            throw new UnsupportedOperationException("pk is unsupported in openmldb");
+        } else {
+            return super.buildCreateTableStatement(table, fields);
+        }
+    }
+
+    @Override
+    protected void writeColumnSpec(ExpressionBuilder builder, SinkRecordField f) {
+        builder.appendColumnName(f.name());
+        builder.append(" ");
+        String sqlType = this.getSqlType(f);
+        builder.append(sqlType);
+        if (f.defaultValue() != null) {
+            builder.append(" DEFAULT ");
+            this.formatColumnValue(builder, f.schemaType(), f.defaultValue());
+        } else if (!this.isColumnOptional(f)) {
+            builder.append(" NOT NULL");
+        }
+
+    }
+
+    @Override
+    public String buildDropTableStatement(TableId table, DropOptions options) {
+        ExpressionBuilder builder = this.expressionBuilder();
+        builder.append("DROP TABLE ");
+        builder.append(table);
+        return builder.toString();
+    }
+
+    @Override
+    public List<String> buildAlterTable(TableId table, Collection<SinkRecordField> fields) {
+        throw new UnsupportedOperationException("alter is unsupported");
+    }
+
+    @Override
+    public String buildUpdateStatement(TableId table, Collection<ColumnId> keyColumns, Collection<ColumnId> nonKeyColumns) {
+        throw new UnsupportedOperationException("update is unsupported");
+    }
+
+    @Override
+    public String buildDeleteStatement(TableId table, Collection<ColumnId> keyColumns) {
+        throw new UnsupportedOperationException("delete is unsupported");
+    }
+
+    @Override
+    protected Integer getSqlTypeForSchema(Schema schema) {
+        return 0;
+    }
+
+
     @Override
     protected String sanitizedUrl(String url) {
         // MySQL can also have "username:password@" at the beginning of the host list and
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/BufferedRecords.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/BufferedRecords.java
index 87773fb..3ad19be 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/BufferedRecords.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/BufferedRecords.java
@@ -22,12 +22,11 @@ import io.openmessaging.connector.api.errors.ConnectException;
 import org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConfig;
 import org.apache.rocketmq.connect.jdbc.dialect.DatabaseDialect;
 import org.apache.rocketmq.connect.jdbc.dialect.impl.GenericDatabaseDialect;
-import org.apache.rocketmq.connect.jdbc.dialect.impl.OpenMLDBDatabaseDialect;
+import org.apache.rocketmq.connect.jdbc.schema.column.ColumnId;
 import org.apache.rocketmq.connect.jdbc.schema.db.DbStructure;
+import org.apache.rocketmq.connect.jdbc.schema.table.TableId;
 import org.apache.rocketmq.connect.jdbc.sink.metadata.FieldsMetadata;
 import org.apache.rocketmq.connect.jdbc.sink.metadata.SchemaPair;
-import org.apache.rocketmq.connect.jdbc.schema.column.ColumnId;
-import org.apache.rocketmq.connect.jdbc.schema.table.TableId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -181,7 +180,7 @@ public class BufferedRecords {
         );
         if (totalUpdateCount.filter(total -> total != expectedCount).isPresent()
                 && config.getInsertMode() == JdbcSinkConfig.InsertMode.INSERT) {
-            if (dbDialect.name().equals(GenericDatabaseDialect.DialectName.generateDialectName(OpenMLDBDatabaseDialect.class)) && totalUpdateCount.get() == 0) {
+            if (dbDialect.name().equals(GenericDatabaseDialect.DialectName.generateDialectName(dbDialect.getDialectClass())) && totalUpdateCount.get() == 0) {
                 // openMLDB execute success result 0; do nothing
             } else {
                 throw new ConnectException(
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingCriteria.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingCriteria.java
index fcd612b..8b8eaa9 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingCriteria.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingCriteria.java
@@ -18,9 +18,10 @@ package org.apache.rocketmq.connect.jdbc.source;
 
 import io.openmessaging.connector.api.data.Field;
 import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.Struct;
 import io.openmessaging.connector.api.errors.ConnectException;
-import org.apache.rocketmq.connect.jdbc.source.offset.TimestampIncrementingOffset;
 import org.apache.rocketmq.connect.jdbc.schema.column.ColumnId;
+import org.apache.rocketmq.connect.jdbc.source.offset.TimestampIncrementingOffset;
 import org.apache.rocketmq.connect.jdbc.util.DateTimeUtils;
 import org.apache.rocketmq.connect.jdbc.util.ExpressionBuilder;
 import org.slf4j.Logger;
@@ -180,7 +181,7 @@ public class TimestampIncrementingCriteria {
      */
     public TimestampIncrementingOffset extractValues(
             Schema schema,
-            Object[] record,
+            Struct record,
             TimestampIncrementingOffset previousOffset
     ) {
         Timestamp extractedTimestamp = null;
@@ -211,11 +212,11 @@ public class TimestampIncrementingCriteria {
      */
     protected Timestamp extractOffsetTimestamp(
             Schema schema,
-            Object[] record
+            Struct record
     ) {
         for (ColumnId timestampColumn : timestampColumns) {
             Field field = schema.getField(timestampColumn.name());
-            Timestamp ts = (Timestamp) record[field.getIndex()];
+            Timestamp ts = (Timestamp) record.get(field);
             if (ts != null) {
                 return ts;
             }
@@ -232,7 +233,7 @@ public class TimestampIncrementingCriteria {
      */
     protected Long extractOffsetIncrementedId(
             Schema schema,
-            Object[] record
+            Struct record
     ) {
         final Long extractedId;
         final Field field = schema.getField(incrementingColumn.name());
@@ -242,7 +243,7 @@ public class TimestampIncrementingCriteria {
         }
 
         final Schema incrementingColumnSchema = field.getSchema();
-        final Object incrementingColumnValue = record[field.getIndex()];
+        final Object incrementingColumnValue = record.get(field);
         if (incrementingColumnValue == null) {
             throw new ConnectException(
                     "Null value for incrementing column of type: " + incrementingColumnSchema.getFieldType());
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/metadata/SchemaMapping.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/metadata/SchemaMapping.java
index e54cdc8..9affae6 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/metadata/SchemaMapping.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/metadata/SchemaMapping.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.connect.jdbc.source.metadata;
 import io.openmessaging.connector.api.data.Field;
 import io.openmessaging.connector.api.data.Schema;
 import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
 import org.apache.rocketmq.connect.jdbc.dialect.DatabaseDialect;
 import org.apache.rocketmq.connect.jdbc.schema.column.ColumnDefinition;
 import org.apache.rocketmq.connect.jdbc.schema.column.ColumnId;
@@ -54,21 +55,21 @@ public final class SchemaMapping {
         // describe columns
         Map<ColumnId, ColumnDefinition> colDefins = dialect.describeColumns(conn, tableId, metadata);
         Map<String, DatabaseDialect.ColumnConverter> colConvertersByFieldName = new LinkedHashMap<>();
-        Schema builder = SchemaBuilder.struct().name(schemaName).build();
-        builder.setFields(new ArrayList<>());
+        SchemaBuilder builder = SchemaBuilder.struct().name(schemaName);
 
         int columnNumber = 0;
         for (ColumnDefinition colDefn : colDefins.values()) {
-            String fieldName = dialect.addFieldToSchema(colDefn, builder, columnNumber);
+            ++columnNumber;
+            String fieldName = dialect.addFieldToSchema(colDefn, builder);
             if (fieldName == null) {
                 continue;
             }
-            Field field = builder.getField(fieldName);
-            ColumnMapping mapping = new ColumnMapping(colDefn, ++columnNumber, field);
+            Field field = builder.field(fieldName);
+            ColumnMapping mapping = new ColumnMapping(colDefn, columnNumber, field);
             DatabaseDialect.ColumnConverter converter = dialect.createColumnConverter(mapping);
             colConvertersByFieldName.put(fieldName, converter);
         }
-        return new SchemaMapping(builder, colConvertersByFieldName);
+        return new SchemaMapping(builder.build(), colConvertersByFieldName);
     }
 
     private final Schema schema;
@@ -146,11 +147,11 @@ public final class SchemaMapping {
          * @throws IOException
          */
         public void setField(
-                Object[] payload,
+                Struct payload,
                 ResultSet resultSet
         ) throws SQLException, IOException {
             Object value = this.converter.convert(resultSet);
-            payload[field.getIndex()] = value;
+            payload.put(field, value);
         }
 
         @Override
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/querier/BulkQuerier.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/querier/BulkQuerier.java
index fff48a2..9afc6ed 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/querier/BulkQuerier.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/querier/BulkQuerier.java
@@ -20,6 +20,7 @@ import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.data.RecordOffset;
 import io.openmessaging.connector.api.data.RecordPartition;
 import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.Struct;
 import io.openmessaging.connector.api.errors.ConnectException;
 import org.apache.rocketmq.connect.jdbc.common.JdbcSourceConfigConstants;
 import org.apache.rocketmq.connect.jdbc.dialect.DatabaseDialect;
@@ -85,7 +86,7 @@ public class BulkQuerier extends Querier {
     @Override
     public ConnectRecord extractRecord() throws SQLException {
         Schema schema = schemaMapping.schema();
-        Object[] payload = new Object[schema.getFields().size()];
+        Struct payload = new Struct(schema);
         for (SchemaMapping.FieldSetter setter : schemaMapping.fieldSetters()) {
             try {
                 setter.setField(payload, resultSet);
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/querier/Querier.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/querier/Querier.java
index b97c751..8052ec2 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/querier/Querier.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/querier/Querier.java
@@ -19,8 +19,8 @@ package org.apache.rocketmq.connect.jdbc.source.querier;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import org.apache.rocketmq.connect.jdbc.dialect.DatabaseDialect;
 import org.apache.rocketmq.connect.jdbc.dialect.provider.CachedConnectionProvider;
-import org.apache.rocketmq.connect.jdbc.source.metadata.SchemaMapping;
 import org.apache.rocketmq.connect.jdbc.schema.table.TableId;
+import org.apache.rocketmq.connect.jdbc.source.metadata.SchemaMapping;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/querier/TimestampIncrementingQuerier.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/querier/TimestampIncrementingQuerier.java
index 7fbbb17..dfb6020 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/querier/TimestampIncrementingQuerier.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/querier/TimestampIncrementingQuerier.java
@@ -20,15 +20,16 @@ import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.data.RecordOffset;
 import io.openmessaging.connector.api.data.RecordPartition;
 import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.Struct;
 import io.openmessaging.connector.api.errors.ConnectException;
 import org.apache.rocketmq.connect.jdbc.dialect.DatabaseDialect;
 import org.apache.rocketmq.connect.jdbc.dialect.provider.CachedConnectionProvider;
-import org.apache.rocketmq.connect.jdbc.source.offset.SourceOffsetCompute;
-import org.apache.rocketmq.connect.jdbc.source.metadata.SchemaMapping;
-import org.apache.rocketmq.connect.jdbc.source.TimestampIncrementingCriteria;
-import org.apache.rocketmq.connect.jdbc.source.offset.TimestampIncrementingOffset;
 import org.apache.rocketmq.connect.jdbc.schema.column.ColumnDefinition;
 import org.apache.rocketmq.connect.jdbc.schema.column.ColumnId;
+import org.apache.rocketmq.connect.jdbc.source.TimestampIncrementingCriteria;
+import org.apache.rocketmq.connect.jdbc.source.metadata.SchemaMapping;
+import org.apache.rocketmq.connect.jdbc.source.offset.SourceOffsetCompute;
+import org.apache.rocketmq.connect.jdbc.source.offset.TimestampIncrementingOffset;
 import org.apache.rocketmq.connect.jdbc.util.DateTimeUtils;
 import org.apache.rocketmq.connect.jdbc.util.ExpressionBuilder;
 import org.slf4j.Logger;
@@ -184,7 +185,7 @@ public class TimestampIncrementingQuerier extends Querier implements TimestampIn
     @Override
     public ConnectRecord extractRecord() throws SQLException {
         Schema schema = schemaMapping.schema();
-        Object[] payload = new Object[schema.getFields().size()];
+        Struct payload = new Struct(schema);
         for (SchemaMapping.FieldSetter setter : schemaMapping.fieldSetters()) {
             try {
                 setter.setField(payload, resultSet);
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/util/BytesUtil.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/util/BytesUtil.java
index b2d6667..c183a47 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/util/BytesUtil.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/util/BytesUtil.java
@@ -18,12 +18,12 @@ package org.apache.rocketmq.connect.jdbc.util;
 
 public class BytesUtil {
 
-    private static final char[] hexCode = "0123456789ABCDEF".toCharArray();
+    private static final char[] HEX_CODE = "0123456789ABCDEF".toCharArray();
     public static String toHex(byte[] data) {
         StringBuilder r = new StringBuilder(data.length * 2);
         for (byte b : data) {
-            r.append(hexCode[(b >> 4) & 0xF]);
-            r.append(hexCode[b & 0xF]);
+            r.append(HEX_CODE[(b >> 4) & 0xF]);
+            r.append(HEX_CODE[b & 0xF]);
         }
         return r.toString();
     }
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/util/NumericMapping.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/util/NumericMapping.java
index e060a5c..3bd1a6d 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/util/NumericMapping.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/util/NumericMapping.java
@@ -28,17 +28,17 @@ public enum NumericMapping {
     BEST_FIT,
     BEST_FIT_EAGER_DOUBLE;
 
-    private static final Map<String, NumericMapping> reverse = new HashMap<>(values().length);
+    private static final Map<String, NumericMapping> REVERSE = new HashMap<>(values().length);
 
     static {
         for (NumericMapping val : values()) {
-            reverse.put(val.name().toLowerCase(Locale.ROOT), val);
+            REVERSE.put(val.name().toLowerCase(Locale.ROOT), val);
         }
     }
 
     public static NumericMapping get(String prop) {
         // not adding a check for null value because the recommender/validator should catch those.
-        return reverse.get(prop.toLowerCase(Locale.ROOT));
+        return REVERSE.get(prop.toLowerCase(Locale.ROOT));
     }
 
     public static NumericMapping get(JdbcSourceConfig config) {
diff --git a/connectors/rocketmq-connect-jdbc/src/test/java/org/apache/rocketmq/connect/jdbc/connector/sink/JdbcSinkTest.java b/connectors/rocketmq-connect-jdbc/src/test/java/org/apache/rocketmq/connect/jdbc/connector/sink/JdbcSinkTest.java
new file mode 100644
index 0000000..ebba6b8
--- /dev/null
+++ b/connectors/rocketmq-connect-jdbc/src/test/java/org/apache/rocketmq/connect/jdbc/connector/sink/JdbcSinkTest.java
@@ -0,0 +1,18 @@
+package org.apache.rocketmq.connect.jdbc.connector.sink;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.connector.api.data.Struct;
+
+public class JdbcSinkTest {
+    static String test="{\"schema\":{\"fieldsByName\":{\"money\":{\"schema\":{\"optional\":true,\"fieldType\":\"FLOAT64\"},\"name\":\"money\",\"index\":5},\"name\":{\"schema\":{\"optional\":true,\"fieldType\":\"STRING\"},\"name\":\"name\",\"index\":1},\"begin_time\":{\"schema\":{\"name\":\"io.openmessaging.connector.api.data.logical.Timestamp\",\"optional\":true,\"fieldType\":\"INT64\",\"version\":1},\"name\":\"begin_time\",\"index\":6},\"company\":{\"schema\":{\"optional\":true,\"fieldT [...]
+
+    public static void main(String[] args) {
+        Struct struct = JSON.parseObject(test, Struct.class);
+        JSONObject object = JSON.parseObject(test);
+        Object[] values=object.getJSONArray("values").toArray();
+        struct.setValues(values);
+        System.out.println(object);
+    }
+
+}
diff --git a/connectors/rocketmq-connect-jdbc/src/test/java/org/apache/rocketmq/connect/jdbc/connector/sink/OpenMLDBJdbcSinkTest.java b/connectors/rocketmq-connect-jdbc/src/test/java/org/apache/rocketmq/connect/jdbc/connector/sink/OpenMLDBJdbcSinkTest.java
index 6d3a9d1..274db7f 100644
--- a/connectors/rocketmq-connect-jdbc/src/test/java/org/apache/rocketmq/connect/jdbc/connector/sink/OpenMLDBJdbcSinkTest.java
+++ b/connectors/rocketmq-connect-jdbc/src/test/java/org/apache/rocketmq/connect/jdbc/connector/sink/OpenMLDBJdbcSinkTest.java
@@ -23,6 +23,7 @@ import io.openmessaging.connector.api.data.RecordOffset;
 import io.openmessaging.connector.api.data.RecordPartition;
 import io.openmessaging.connector.api.data.Schema;
 import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
 import io.openmessaging.internal.DefaultKeyValue;
 import org.apache.rocketmq.connect.jdbc.connector.JdbcSinkTask;
 import org.junit.After;
@@ -73,7 +74,7 @@ public class OpenMLDBJdbcSinkTest {
         }
         assertNotNull(connection);
         KeyValue config = buildConfig(jdbcUrlDB);
-        openJDBCSinkTask.init(config);
+        openJDBCSinkTask.start(config);
     }
 
     @After
@@ -95,6 +96,7 @@ public class OpenMLDBJdbcSinkTest {
         config.put("insert.mode", "INSERT");
         config.put("db.timezone", "UTC");
         config.put("table.types", "TABLE");
+        config.put("auto.create", "true");
         config.put("source-record-converter", "org.apache.rocketmq.connect.runtime.converter.JsonConverter");
         return config;
     }
@@ -106,16 +108,17 @@ public class OpenMLDBJdbcSinkTest {
     public void testOpenMLDBJdbcSinkWriterTest() throws SQLException {
         List<ConnectRecord> records = new ArrayList<>();
         // build schema
-        Schema schema = SchemaBuilder.struct().name(
-                tableName
-        ).build();
-        schema.addField(new Field(0, "c1", SchemaBuilder.int32().build()));
-        schema.addField(new Field(1, "c2", SchemaBuilder.string().build()));
+        Schema schema = SchemaBuilder.struct()
+                .name(tableName)
+                .field("c1",SchemaBuilder.int32().build())
+                .field("c2", SchemaBuilder.string().build())
+                .build();
         // build record
-        Object[] payload = new Object[2];
         int param0 = 1001;
-        payload[0] = param0;
-        payload[1] = String.format("test-data-%s", param0);
+        Struct struct= new Struct(schema);
+        struct.put("c1",param0);
+        struct.put("c2",String.format("test-data-%s", param0));
+
         ConnectRecord record = new ConnectRecord(
                 // offset partition
                 // offset partition"
@@ -123,7 +126,7 @@ public class OpenMLDBJdbcSinkTest {
                 new RecordOffset(new HashMap<>()),
                 System.currentTimeMillis(),
                 schema,
-                payload
+                struct
         );
         records.add(record);
         openJDBCSinkTask.put(records);
diff --git a/pom.xml b/pom.xml
index 0212f88..94a7e52 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,7 +43,7 @@
         <assertj.version>2.6.0</assertj.version>
         <mockito.version>3.2.4</mockito.version>
         <httpclient.version>4.5.13</httpclient.version>
-        <openmessaging.connector.version>0.1.2</openmessaging.connector.version>
+        <openmessaging.connector.version>0.1.3-SNAPSHOT</openmessaging.connector.version>
         <fastjson.version>1.2.83</fastjson.version>
         <javalin.version>2.8.0</javalin.version>
         <slf4j.version>1.7.7</slf4j.version>
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
index 86ce305..63cf7d1 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
@@ -80,8 +80,7 @@ public class TransformChain<R extends ConnectRecord> implements AutoCloseable {
                         transformConfig.put(originKey, config.getString(key));
                     }
                 }
-                transform.validate(transformConfig);
-                transform.init(transformConfig);
+                transform.start(transformConfig);
                 this.transformList.add(transform);
             } catch (Exception e) {
                 log.error("transform new instance error", e);
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
index 48bb04d..58c98ce 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
@@ -526,8 +526,9 @@ public class Worker {
                 }
             } catch (ExecutionException e) {
                 Throwable t = e.getCause();
+                log.error("Execution exception , {}", e);
             } catch (CancellationException | TimeoutException | InterruptedException e) {
-
+                log.error("error, {}", e);
             } finally {
                 future.cancel(true);
                 workerTask.cleanup();
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerConnector.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerConnector.java
index f0433c2..2b037e0 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerConnector.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerConnector.java
@@ -53,12 +53,12 @@ public class WorkerConnector {
     }
 
     public void initialize() {
-        connector.init(keyValue);
+        connector.init(context);
     }
 
     public void start() {
         connector.validate(keyValue);
-        connector.start(context);
+        connector.start(keyValue);
     }
 
     public void stop() {
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
index 2a0e6c7..67e644a 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
@@ -156,37 +156,42 @@ public class WorkerDirectTask implements WorkerTask {
     }
 
     private void starkSinkTask() {
-        sinkTask.init(taskConfig);
-        sinkTask.start(new SinkTaskContext() {
+        sinkTask.init(new SinkTaskContext() {
 
-            @Override public String getConnectorName() {
+            @Override
+            public String getConnectorName() {
                 return taskConfig.getString(RuntimeConfigDefine.CONNECTOR_ID);
             }
 
-            @Override public String getTaskName() {
+            @Override
+            public String getTaskName() {
                 return taskConfig.getString(RuntimeConfigDefine.TASK_ID);
             }
 
-            @Override public void resetOffset(RecordPartition recordPartition, RecordOffset recordOffset) {
+            @Override
+            public void resetOffset(RecordPartition recordPartition, RecordOffset recordOffset) {
 
             }
-
-            @Override public void resetOffset(Map<RecordPartition, RecordOffset> offsets) {
+            @Override
+            public void resetOffset(Map<RecordPartition, RecordOffset> offsets) {
 
             }
 
-            @Override public void pause(List<RecordPartition> partitions) {
+            @Override
+            public void pause(List<RecordPartition> partitions) {
 
             }
 
-            @Override public void resume(List<RecordPartition> partitions) {
+            @Override
+            public void resume(List<RecordPartition> partitions) {
 
             }
-
-            @Override public Set<RecordPartition> assignment() {
+            @Override
+            public Set<RecordPartition> assignment() {
                 return null;
             }
         });
+        sinkTask.start(taskConfig);
         log.info("Sink task start, config:{}", JSON.toJSONString(taskConfig));
     }
 
@@ -197,8 +202,7 @@ public class WorkerDirectTask implements WorkerTask {
 
     private void startSourceTask() {
         state.compareAndSet(WorkerTaskState.NEW, WorkerTaskState.PENDING);
-        sourceTask.init(taskConfig);
-        sourceTask.start(new SourceTaskContext() {
+        sourceTask.init(new SourceTaskContext() {
             @Override public OffsetStorageReader offsetStorageReader() {
                 return positionStorageReader;
             }
@@ -211,6 +215,7 @@ public class WorkerDirectTask implements WorkerTask {
                 return null;
             }
         });
+        sourceTask.start(taskConfig);
         state.compareAndSet(WorkerTaskState.PENDING, WorkerTaskState.RUNNING);
         log.info("Source task start, config:{}", JSON.toJSONString(taskConfig));
     }
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
index 0587875..62adc5b 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
@@ -209,9 +209,9 @@ public class WorkerSinkTask implements WorkerTask {
             consumer.start();
             log.info("Sink task consumer start. taskConfig {}", JSON.toJSONString(taskConfig));
             state.compareAndSet(WorkerTaskState.NEW, WorkerTaskState.PENDING);
-            sinkTask.init(taskConfig);
             this.sinkTaskContext = new WorkerSinkTaskContext(taskConfig, this, consumer);
-            sinkTask.start(sinkTaskContext);
+            sinkTask.init(sinkTaskContext);
+            sinkTask.start(taskConfig);
             // we assume executed here means we are safe
             log.info("Sink task start, config:{}", JSON.toJSONString(taskConfig));
             state.compareAndSet(WorkerTaskState.PENDING, WorkerTaskState.RUNNING);
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContext.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContext.java
index f2f67a0..b17cde3 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContext.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContext.java
@@ -19,6 +19,7 @@
 package org.apache.rocketmq.connect.runtime.connectorwrapper;
 
 import com.alibaba.fastjson.JSON;
+import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
 import io.openmessaging.connector.api.data.RecordOffset;
 import io.openmessaging.connector.api.data.RecordPartition;
@@ -183,6 +184,16 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
         return taskConfig.getString("taskId");
     }
 
+    /**
+     * Get the configurations of current task.
+     *
+     * @return the configuration of current task.
+     */
+    @Override
+    public KeyValue configs() {
+        return taskConfig;
+    }
+
     public Map<MessageQueue, Long> queuesOffsets() {
         return this.messageQueuesOffsetMap;
     }
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index da9891e..0de1f6f 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -19,6 +19,7 @@
 package org.apache.rocketmq.connect.runtime.connectorwrapper;
 
 import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.task.source.SourceTask;
 import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
@@ -161,8 +162,7 @@ public class WorkerSourceTask implements WorkerTask {
             producer.start();
             log.info("Source task producer start.");
             state.compareAndSet(WorkerTaskState.NEW, WorkerTaskState.PENDING);
-            sourceTask.init(taskConfig);
-            sourceTask.start(new SourceTaskContext() {
+            sourceTask.init(new SourceTaskContext() {
 
                 @Override
                 public OffsetStorageReader offsetStorageReader() {
@@ -178,7 +178,18 @@ public class WorkerSourceTask implements WorkerTask {
                 public String getTaskName() {
                     return taskConfig.getString(RuntimeConfigDefine.TASK_ID);
                 }
+
+                /**
+                 * Get the configurations of current task.
+                 *
+                 * @return the configuration of current task.
+                 */
+                @Override
+                public KeyValue configs() {
+                    return taskConfig;
+                }
             });
+            sourceTask.start(taskConfig);
             state.compareAndSet(WorkerTaskState.PENDING, WorkerTaskState.RUNNING);
             log.info("Source task start, config:{}", JSON.toJSONString(taskConfig));
             while (WorkerState.STARTED == workerState.get() && WorkerTaskState.RUNNING == state.get()) {
@@ -316,7 +327,7 @@ public class WorkerSourceTask implements WorkerTask {
                     sourceMessage.setBody(messageBody);
                 }
             } else {
-                final byte[] messageBody = JSON.toJSONString(sourceDataEntry).getBytes();
+                final byte[] messageBody = JSON.toJSONString(sourceDataEntry, SerializerFeature.DisableCircularReferenceDetect).getBytes();
                 if (messageBody.length > RuntimeConfigDefine.MAX_MESSAGE_SIZE) {
                     log.error("Send record, message size is greater than {} bytes, sourceDataEntry: {}", RuntimeConfigDefine.MAX_MESSAGE_SIZE, JSON.toJSONString(sourceDataEntry));
                     continue;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
index adaa5ce..1134e1d 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
@@ -185,7 +185,7 @@ public class ConfigManagementServiceImpl implements ConfigManagementService {
         }
         final Connector connector = (Connector) clazz.getDeclaredConstructor().newInstance();
         connector.validate(configs);
-        connector.init(configs);
+        connector.start(configs);
         connectorKeyValueStore.put(connectorName, configs);
         recomputeTaskConfigs(connectorName, connector, currentTimestamp, configs);
         return "";
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
index f0736c4..5e8bb5b 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
@@ -154,7 +154,7 @@ public class MemoryConfigManagementServiceImpl implements ConfigManagementServic
         }
         final Connector connector = (Connector) clazz.getDeclaredConstructor().newInstance();
         connector.validate(configs);
-        connector.init(configs);
+        connector.start(configs);
         connectorKeyValueStore.put(connectorName, configs);
         recomputeTaskConfigs(connectorName, connector, currentTimestamp, configs);
         return "";
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestConnector.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestConnector.java
index aede927..80ab134 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestConnector.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestConnector.java
@@ -33,30 +33,21 @@ public class TestConnector extends Connector {
 
     }
 
-    @Override public void init(KeyValue config) {
+    /**
+     * Start the component
+     *
+     * @param config component context
+     */
+    @Override
+    public void start(KeyValue config) {
         this.config = config;
     }
 
-    @Override public void start(ConnectorContext componentContext) {
-
-    }
-
     @Override
     public void stop() {
 
     }
 
-
-    @Override
-    public void pause() {
-
-    }
-
-    @Override
-    public void resume() {
-
-    }
-
     @Override public List<KeyValue> taskConfigs(int maxTasks) {
         List<KeyValue> configs = new ArrayList<>();
         configs.add(this.config);
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestSourceTask.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestSourceTask.java
index 2c6a9ad..a131aef 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestSourceTask.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestSourceTask.java
@@ -49,11 +49,7 @@ public class TestSourceTask extends SourceTask {
 
     }
 
-    @Override public void init(KeyValue config) {
-
-    }
-
-    @Override public void start(SourceTaskContext sourceTaskContext) {
+    @Override public void start(KeyValue config) {
 
     }
 
@@ -62,13 +58,4 @@ public class TestSourceTask extends SourceTask {
 
     }
 
-    @Override
-    public void pause() {
-
-    }
-
-    @Override
-    public void resume() {
-
-    }
 }
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestTask.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestTask.java
index 64e4582..6742325 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestTask.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestTask.java
@@ -28,23 +28,23 @@ class TestTask implements Task {
 
     }
 
-    @Override public void init(KeyValue config) {
+    /**
+     * Init the component
+     *
+     * @param context component context
+     */
+    @Override
+    public void init(ComponentContext context) {
 
     }
 
-    @Override public void start(ComponentContext componentContext) {
 
-    }
-
-    @Override public void stop() {
+    @Override public void start(KeyValue config) {
 
     }
 
-    @Override public void pause() {
+    @Override public void stop() {
 
     }
 
-    @Override public void resume() {
-
-    }
 }
diff --git a/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSinkConnector.java b/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSinkConnector.java
index 23de734..35a751d 100644
--- a/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSinkConnector.java
+++ b/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSinkConnector.java
@@ -37,21 +37,20 @@ public class FileSinkConnector extends SinkConnector {
         }
     }
 
-    @Override public void init(KeyValue config) {
+    /**
+     * Start the component
+     *
+     * @param config component context
+     */
+    @Override
+    public void start(KeyValue config) {
         this.config = config;
     }
 
     @Override public void stop() {
-
-    }
-
-    @Override public void pause() {
-
+        this.config = null;
     }
 
-    @Override public void resume() {
-
-    }
 
     @Override public Class<? extends Task> taskClass() {
         return FileSinkTask.class;
diff --git a/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSinkTask.java b/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSinkTask.java
index 08a8964..28dffd2 100644
--- a/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSinkTask.java
+++ b/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSinkTask.java
@@ -60,7 +60,12 @@ public class FileSinkTask extends SinkTask {
         outputStream.flush();
     }
 
-    @Override public void start(SinkTaskContext sinkTaskContext) {
+    @Override public void validate(KeyValue config) {
+
+    }
+
+    @Override public void start(KeyValue config) {
+        this.config = config;
         fileConfig = new FileConfig();
         fileConfig.load(config);
         if (fileConfig.getFilename() == null || fileConfig.getFilename().isEmpty()) {
@@ -68,37 +73,22 @@ public class FileSinkTask extends SinkTask {
         } else {
             try {
                 outputStream = new PrintStream(
-                    Files.newOutputStream(Paths.get(fileConfig.getFilename()), StandardOpenOption.CREATE, StandardOpenOption.APPEND),
-                    false,
-                    StandardCharsets.UTF_8.name());
+                        Files.newOutputStream(Paths.get(fileConfig.getFilename()), StandardOpenOption.CREATE, StandardOpenOption.APPEND),
+                        false,
+                        StandardCharsets.UTF_8.name());
             } catch (IOException e) {
                 throw new ConnectException("Couldn't find or create file '" + fileConfig.getFilename() + "' for FileStreamSinkTask", e);
             }
         }
-    }
-
-    @Override public void validate(KeyValue config) {
 
     }
 
-    @Override public void init(KeyValue config) {
-        this.config = config;
-    }
-
     @Override public void stop() {
         if (outputStream != null && outputStream != System.out) {
             outputStream.close();
         }
     }
 
-    @Override public void pause() {
-
-    }
-
-    @Override public void resume() {
-
-    }
-
     private String logFilename() {
         return fileConfig.getFilename() == null ? "stdout" : fileConfig.getFilename();
     }
diff --git a/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSourceConnector.java b/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSourceConnector.java
index 9a96a0f..58dd74a 100644
--- a/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSourceConnector.java
+++ b/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSourceConnector.java
@@ -37,21 +37,14 @@ public class FileSourceConnector extends SourceConnector {
         }
     }
 
-    @Override public void init(KeyValue config) {
+    @Override public void start(KeyValue config) {
         this.config = config;
     }
 
     @Override public void stop() {
-
-    }
-
-    @Override public void pause() {
-
+        this.config = null;
     }
 
-    @Override public void resume() {
-
-    }
 
     @Override public List<KeyValue> taskConfigs(int maxTasks) {
         List<KeyValue> config = new ArrayList<>();
diff --git a/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSourceTask.java b/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSourceTask.java
index 8309833..420f5ac 100644
--- a/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSourceTask.java
+++ b/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSourceTask.java
@@ -18,7 +18,6 @@ package org.apache.rocketmq.connect.file;
 
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.task.source.SourceTask;
-import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.data.Field;
 import io.openmessaging.connector.api.data.FieldType;
@@ -202,8 +201,11 @@ public class FileSourceTask extends SourceTask {
         }
     }
 
-    @Override public void start(SourceTaskContext sourceTaskContext) {
-        this.sourceTaskContext = sourceTaskContext;
+
+    @Override public void validate(KeyValue config) { }
+
+    @Override public void start(KeyValue config) {
+        this.config = config;
         fileConfig = new FileConfig();
         fileConfig.load(config);
         log.info("fileName is:{}", fileConfig.getFilename());
@@ -214,14 +216,6 @@ public class FileSourceTask extends SourceTask {
         }
     }
 
-    @Override public void validate(KeyValue config) {
-
-    }
-
-    @Override public void init(KeyValue config) {
-        this.config = config;
-    }
-
     @Override public void stop() {
         log.trace("Stopping");
         synchronized (this) {
@@ -237,14 +231,6 @@ public class FileSourceTask extends SourceTask {
         }
     }
 
-    @Override public void pause() {
-
-    }
-
-    @Override public void resume() {
-
-    }
-
     private String logFilename() {
         return fileConfig.getFilename() == null ? "stdin" : fileConfig.getFilename();
     }
diff --git a/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FilterTransform.java b/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FilterTransform.java
index 31f57a6..d0f8f02 100644
--- a/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FilterTransform.java
+++ b/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FilterTransform.java
@@ -23,6 +23,8 @@ import io.openmessaging.connector.api.data.ConnectRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Closeable;
+
 public class FilterTransform implements Transform<ConnectRecord> {
 
     private static final Logger log = LoggerFactory.getLogger(LoggerName.FILE_CONNECTOR);
@@ -37,20 +39,23 @@ public class FilterTransform implements Transform<ConnectRecord> {
         return record;
     }
 
-    @Override public void validate(KeyValue config) {
-
-    }
 
-    @Override public void init(KeyValue config) {
+    /**
+     * Start the component
+     *
+     * @param config component context
+     */
+    @Override
+    public void start(KeyValue config) {
         this.keyValue = config;
         log.info("transform config {}", this.keyValue);
     }
 
-    @Override public void start(ComponentContext componentContext) {
-
-    }
-
-    @Override public void stop() {
+    /**
+     * Stop the component.
+     */
+    @Override
+    public void stop() {
 
     }
 }