You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/04/01 07:38:04 UTC

[rocketmq-connect] branch master updated: [ISSUE #16] [Part 4] Support matrix (#17)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new f887523  [ISSUE #16] [Part 4] Support matrix (#17)
f887523 is described below

commit f887523d5412c51dad033ea0f54205701b72a8fc
Author: zhoubo <87...@qq.com>
AuthorDate: Fri Apr 1 15:37:59 2022 +0800

    [ISSUE #16] [Part 4] Support matrix (#17)
    
    * fix load connector api repeat
    
    * reblance add try cache
    
    * supprot matrix
    
    * source connect put extension to msg property
    
    * extendion key add MQ-SYS- && connect-ext- prefix
    
    * fix file connect bug and add demo transform
    
    * fix matrix null pointer
    
    * connect ,task start stop add try cache
    
    * fix transform config lost bug
    
    * fix transform config lost bug
    
    * transform remove prefix
    
    * fix load connector api repeat
    
    * transform remove prefix
    
    * fix transform validate config error
    
    * org.slf4j load by runtime
    
    * remove transform class key config
    
    * remove transformchain main
    
    * add topic config volatile, producer send add topic log
---
 .../connect/runtime/ConnectController.java         |  24 +-
 .../connect/runtime/common/LoggerName.java         |   3 +
 .../runtime/config/RuntimeConfigDefine.java        |   1 +
 .../runtime/connectorwrapper/TransformChain.java   |  10 +-
 .../connect/runtime/connectorwrapper/Worker.java   | 215 +++++++------
 .../runtime/connectorwrapper/WorkerSinkTask.java   | 102 ++++--
 .../runtime/connectorwrapper/WorkerSourceTask.java |  83 +++--
 .../rocketmq/connect/runtime/rest/RestHandler.java |  10 +-
 .../service/ConfigManagementServiceImpl.java       |   6 +
 .../runtime/service/TaskPositionCommitService.java |   9 +-
 .../connect/runtime/stats/ConnectStats.java        |  80 +++++
 .../connect/runtime/stats/ConnectStatsManager.java | 284 +++++++++++++++++
 .../connect/runtime/stats/ConnectStatsService.java | 348 +++++++++++++++++++++
 .../connect/runtime/utils/ConnectUtil.java         |  27 ++
 .../connect/runtime/utils/PluginUtils.java         |   1 +
 .../src/main/resources/logback.xml                 |  18 ++
 .../runtime/connectorwrapper/WorkerTest.java       |  15 +-
 .../connect/runtime/rest/RestHandlerTest.java      |  12 +-
 .../apache/rocketmq/connect/file/FileSinkTask.java |  18 +-
 .../rocketmq/connect/file/FileSourceTask.java      |   3 +-
 .../rocketmq/connect/file/FilterTransform.java     |  56 ++++
 21 files changed, 1131 insertions(+), 194 deletions(-)

diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectController.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectController.java
index 71c8fda..3971656 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectController.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectController.java
@@ -37,6 +37,8 @@ import org.apache.rocketmq.connect.runtime.service.PositionManagementServiceImpl
 import org.apache.rocketmq.connect.runtime.service.RebalanceImpl;
 import org.apache.rocketmq.connect.runtime.service.RebalanceService;
 import org.apache.rocketmq.connect.runtime.service.strategy.AllocateConnAndTaskStrategy;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
 import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
 import org.apache.rocketmq.connect.runtime.utils.Plugin;
 import org.slf4j.Logger;
@@ -101,6 +103,10 @@ public class ConnectController {
 
     private final Plugin plugin;
 
+    private ConnectStatsManager connectStatsManager;
+
+    private final ConnectStatsService connectStatsService;
+
     public ConnectController(
         ConnectConfig connectConfig) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
 
@@ -115,13 +121,14 @@ public class ConnectController {
         }
         plugin = new Plugin(pluginPaths);
         plugin.initPlugin();
-
+        this.connectStatsManager = new ConnectStatsManager(connectConfig);
+        this.connectStatsService = new ConnectStatsService();
         this.connectConfig = connectConfig;
         this.clusterManagementService = new ClusterManagementServiceImpl(connectConfig);
         this.configManagementService = new ConfigManagementServiceImpl(connectConfig, plugin);
         this.positionManagementService = new PositionManagementServiceImpl(connectConfig);
         this.offsetManagementService = new OffsetManagementServiceImpl(connectConfig);
-        this.worker = new Worker(connectConfig, positionManagementService, offsetManagementService, plugin);
+        this.worker = new Worker(connectConfig, positionManagementService, configManagementService, plugin, this);
         AllocateConnAndTaskStrategy strategy = ConnectUtil.initAllocateConnAndTaskStrategy(connectConfig);
         this.rebalanceImpl = new RebalanceImpl(worker, configManagementService, clusterManagementService, strategy, this);
         this.restHandler = new RestHandler(this);
@@ -140,6 +147,7 @@ public class ConnectController {
         offsetManagementService.start();
         worker.start();
         rebalanceService.start();
+        connectStatsService.start();
 
         // Persist configurations of current connectors and tasks.
         this.scheduledExecutorService.scheduleAtFixedRate(() -> {
@@ -233,4 +241,16 @@ public class ConnectController {
     public RebalanceImpl getRebalanceImpl() {
         return rebalanceImpl;
     }
+
+    public ConnectStatsManager getConnectStatsManager() {
+        return connectStatsManager;
+    }
+
+    public void setConnectStatsManager(ConnectStatsManager connectStatsManager) {
+        this.connectStatsManager = connectStatsManager;
+    }
+
+    public ConnectStatsService getConnectStatsService() {
+        return connectStatsService;
+    }
 }
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/LoggerName.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/LoggerName.java
index b01dc36..2b62956 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/LoggerName.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/LoggerName.java
@@ -22,4 +22,7 @@ package org.apache.rocketmq.connect.runtime.common;
  */
 public class LoggerName {
     public static final String ROCKETMQ_RUNTIME = "RocketMQRuntime";
+    public static final String BROKER_BASE_LOG = "BrokerBaseLog";
+    public static final String COMMON_LOGGER_NAME = "ConnectCommon";
+    public static final String ROCKETMQ_CONNECT_STATS = "RocketmqConnectStats";
 }
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/RuntimeConfigDefine.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/RuntimeConfigDefine.java
index 7885475..d7a13e2 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/RuntimeConfigDefine.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/RuntimeConfigDefine.java
@@ -99,6 +99,7 @@ public class RuntimeConfigDefine {
     public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
         {
             add(CONNECTOR_CLASS);
+            add(CONNECT_TOPICNAME);
         }
     };
 
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
index ae89572..3de0222 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
@@ -68,17 +68,19 @@ public class TransformChain<R extends ConnectRecord> {
             return;
         }
         transformList.stream().forEach(transformStr -> {
-            String transformClass = config.getString(PREFIX + transformStr + "-class");
+            String transformClassKey = PREFIX + transformStr + "-class";
+            String transformClass = config.getString(transformClassKey);
             try {
                 Transform transform = getTransform(transformClass);
-                transform.validate(config);
                 KeyValue transformConfig = new DefaultKeyValue();
                 Set<String> configKeys = config.keySet();
                 for (String key : configKeys) {
-                    if (key.startsWith(PREFIX + transformStr)) {
-                        transformConfig.put(key, config.getString(key));
+                    if (key.startsWith(PREFIX + transformStr) && !key.equals(transformClassKey)) {
+                        String originKey = key.replace(PREFIX + transformStr + "-", "");
+                        transformConfig.put(originKey, config.getString(key));
                     }
                 }
+                transform.validate(transformConfig);
                 transform.init(transformConfig);
                 this.transformList.add(transform);
             } catch (Exception e) {
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
index b899509..e22f969 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.connect.runtime.connectorwrapper;
 
+import com.alibaba.fastjson.JSON;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import io.netty.util.internal.ConcurrentSet;
 import io.openmessaging.connector.api.component.connector.Connector;
@@ -48,9 +49,12 @@ import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
 import org.apache.rocketmq.connect.runtime.common.LoggerName;
 import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
 import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
+import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
 import org.apache.rocketmq.connect.runtime.service.DefaultConnectorContext;
 import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
 import org.apache.rocketmq.connect.runtime.service.TaskPositionCommitService;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
 import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
 import org.apache.rocketmq.connect.runtime.utils.Plugin;
 import org.apache.rocketmq.connect.runtime.utils.PluginClassLoader;
@@ -86,15 +90,12 @@ public class Worker {
 
     private Set<Runnable> cleanedStoppedTasks = new ConcurrentSet<>();
 
-
     Map<String, List<ConnectKeyValue>> latestTaskConfigs = new HashMap<>();
     /**
      * Current running tasks to its Future map.
      */
     private Map<Runnable, Future> taskToFutureMap = new ConcurrentHashMap<>();
 
-
-
     /**
      * Thread pool for connectors and tasks.
      */
@@ -106,11 +107,6 @@ public class Worker {
     private final PositionManagementService positionManagementService;
 
     /**
-     * Offset management for source tasks.
-     */
-    private final PositionManagementService offsetManagementService;
-
-    /**
      * A scheduled task to commit source position of source tasks.
      */
     private final TaskPositionCommitService taskPositionCommitService;
@@ -119,30 +115,33 @@ public class Worker {
 
     private final Plugin plugin;
 
-    private  static final int MAX_START_TIMEOUT_MILLS = 5000;
+    private static final int MAX_START_TIMEOUT_MILLS = 5000;
 
-    private  static final long MAX_STOP_TIMEOUT_MILLS = 20000;
+    private static final long MAX_STOP_TIMEOUT_MILLS = 20000;
 
     /**
      * Atomic state variable
      */
     private AtomicReference<WorkerState> workerState;
 
-
     private StateMachineService stateMachineService = new StateMachineService();
 
+    private final ConnectStatsManager connectStatsManager;
+
+    private final ConnectStatsService connectStatsService;
+
     public Worker(ConnectConfig connectConfig,
-                  PositionManagementService positionManagementService, PositionManagementService offsetManagementService,
-                  Plugin plugin) {
+        PositionManagementService positionManagementService, ConfigManagementService configManagementService,
+        Plugin plugin, ConnectController connectController) {
         this.connectConfig = connectConfig;
         this.taskExecutor = Executors.newCachedThreadPool(new DefaultThreadFactory("task-Worker-Executor-"));
         this.positionManagementService = positionManagementService;
-        this.offsetManagementService = offsetManagementService;
         this.taskPositionCommitService = new TaskPositionCommitService(
             this,
-            positionManagementService,
-            offsetManagementService);
+            positionManagementService);
         this.plugin = plugin;
+        this.connectStatsManager = connectController.getConnectStatsManager();
+        this.connectStatsService = connectController.getConnectStatsService();
     }
 
     public void start() {
@@ -160,17 +159,21 @@ public class Worker {
      * @throws Exception
      */
     public synchronized void startConnectors(Map<String, ConnectKeyValue> connectorConfigs,
-                                             ConnectController connectController) throws Exception {
+        ConnectController connectController) throws Exception {
         Set<WorkerConnector> stoppedConnector = new HashSet<>();
         for (WorkerConnector workerConnector : workingConnectors) {
-            String connectorName = workerConnector.getConnectorName();
-            ConnectKeyValue keyValue = connectorConfigs.get(connectorName);
-            if (null == keyValue || 0 != keyValue.getInt(RuntimeConfigDefine.CONFIG_DELETED)) {
-                workerConnector.stop();
-                log.info("Connector {} stop", workerConnector.getConnectorName());
-                stoppedConnector.add(workerConnector);
-            } else if (!keyValue.equals(workerConnector.getKeyValue())) {
-                workerConnector.reconfigure(keyValue);
+            try {
+                String connectorName = workerConnector.getConnectorName();
+                ConnectKeyValue keyValue = connectorConfigs.get(connectorName);
+                if (null == keyValue || 0 != keyValue.getInt(RuntimeConfigDefine.CONFIG_DELETED)) {
+                    workerConnector.stop();
+                    log.info("Connector {} stop", workerConnector.getConnectorName());
+                    stoppedConnector.add(workerConnector);
+                } else if (!keyValue.equals(workerConnector.getKeyValue())) {
+                    workerConnector.reconfigure(keyValue);
+                }
+            } catch (Exception e) {
+                log.error("stop or reconfigure connector error, connectName: " + workerConnector.getConnectorName(), e);
             }
         }
         workingConnectors.removeAll(stoppedConnector);
@@ -193,28 +196,32 @@ public class Worker {
         }
 
         for (String connectorName : newConnectors.keySet()) {
-            ConnectKeyValue keyValue = newConnectors.get(connectorName);
-            String connectorClass = keyValue.getString(RuntimeConfigDefine.CONNECTOR_CLASS);
-            ClassLoader loader = plugin.getPluginClassLoader(connectorClass);
-            final ClassLoader currentThreadLoader = plugin.currentThreadLoader();
-            Class clazz;
-            boolean isolationFlag = false;
-            if (loader instanceof PluginClassLoader) {
-                clazz = ((PluginClassLoader) loader).loadClass(connectorClass, false);
-                isolationFlag = true;
-            } else {
-                clazz = Class.forName(connectorClass);
-            }
-            final Connector connector = (Connector) clazz.getDeclaredConstructor().newInstance();
-            WorkerConnector workerConnector = new WorkerConnector(connectorName, connector, connectorConfigs.get(connectorName), new DefaultConnectorContext(connectorName, connectController));
-            if (isolationFlag) {
-                Plugin.compareAndSwapLoaders(loader);
+            try {
+                ConnectKeyValue keyValue = newConnectors.get(connectorName);
+                String connectorClass = keyValue.getString(RuntimeConfigDefine.CONNECTOR_CLASS);
+                ClassLoader loader = plugin.getPluginClassLoader(connectorClass);
+                final ClassLoader currentThreadLoader = plugin.currentThreadLoader();
+                Class clazz;
+                boolean isolationFlag = false;
+                if (loader instanceof PluginClassLoader) {
+                    clazz = ((PluginClassLoader) loader).loadClass(connectorClass, false);
+                    isolationFlag = true;
+                } else {
+                    clazz = Class.forName(connectorClass);
+                }
+                final Connector connector = (Connector) clazz.getDeclaredConstructor().newInstance();
+                WorkerConnector workerConnector = new WorkerConnector(connectorName, connector, connectorConfigs.get(connectorName), new DefaultConnectorContext(connectorName, connectController));
+                if (isolationFlag) {
+                    Plugin.compareAndSwapLoaders(loader);
+                }
+                workerConnector.initialize();
+                workerConnector.start();
+                log.info("Connector {} start", workerConnector.getConnectorName());
+                Plugin.compareAndSwapLoaders(currentThreadLoader);
+                this.workingConnectors.add(workerConnector);
+            } catch (Exception e) {
+                log.error("worker connector start exception. workerName: " + connectorName, e);
             }
-            workerConnector.initialize();
-            workerConnector.start();
-            log.info("Connector {} start", workerConnector.getConnectorName());
-            Plugin.compareAndSwapLoaders(currentThreadLoader);
-            this.workingConnectors.add(workerConnector);
         }
     }
 
@@ -231,7 +238,6 @@ public class Worker {
         }
     }
 
-
     private boolean isConfigInSet(ConnectKeyValue keyValue, Set<Runnable> set) {
         for (Runnable runnable : set) {
             ConnectKeyValue taskConfig = null;
@@ -268,15 +274,15 @@ public class Worker {
     }
 
     public void setWorkingConnectors(
-            Set<WorkerConnector> workingConnectors) {
+        Set<WorkerConnector> workingConnectors) {
         this.workingConnectors = workingConnectors;
     }
 
-
     /**
      * Beaware that we are not creating a defensive copy of these tasks
      * So developers should only use these references for read-only purposes.
      * These variables should be immutable
+     *
      * @return
      */
     public Set<Runnable> getWorkingTasks() {
@@ -311,7 +317,6 @@ public class Worker {
         this.runningTasks = workingTasks;
     }
 
-
     public void maintainConnectorState() {
 
     }
@@ -332,7 +337,6 @@ public class Worker {
             List<ConnectKeyValue> keyValues = taskConfigs.get(connectorName);
             WorkerTaskState state = ((WorkerTask) runnable).getState();
 
-
             if (WorkerTaskState.ERROR == state) {
                 errorTasks.add(runnable);
                 runningTasks.remove(runnable);
@@ -347,10 +351,12 @@ public class Worker {
                     }
                 }
 
-
                 if (needStop) {
-                    workerTask.stop();
-
+                    try {
+                        workerTask.stop();
+                    } catch (Exception e) {
+                        log.error("workerTask stop exception, workerTask: " + workerTask.getTaskConfig(), e);
+                    }
                     log.info("Task stopping, connector name {}, config {}", workerTask.getConnectorName(), workerTask.getTaskConfig());
                     runningTasks.remove(runnable);
                     stoppingTasks.put(runnable, System.currentTimeMillis());
@@ -393,55 +399,58 @@ public class Worker {
                     createDirectTask(connectorName, keyValue);
                     continue;
                 }
-
-                String taskClass = keyValue.getString(RuntimeConfigDefine.TASK_CLASS);
-                ClassLoader loader = plugin.getPluginClassLoader(taskClass);
-                final ClassLoader currentThreadLoader = plugin.currentThreadLoader();
-                Class taskClazz;
-                boolean isolationFlag = false;
-                if (loader instanceof PluginClassLoader) {
-                    taskClazz = ((PluginClassLoader) loader).loadClass(taskClass, false);
-                    isolationFlag = true;
-                } else {
-                    taskClazz = Class.forName(taskClass);
-                }
-                final Task task = (Task) taskClazz.getDeclaredConstructor().newInstance();
-                final String converterClazzName = keyValue.getString(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER);
-                Converter recordConverter = null;
-                if (StringUtils.isNotEmpty(converterClazzName)) {
-                    Class converterClazz = Class.forName(converterClazzName);
-                    recordConverter = (Converter) converterClazz.newInstance();
-                }
-                if (isolationFlag) {
-                    Plugin.compareAndSwapLoaders(loader);
-                }
-                if (task instanceof SourceTask) {
-                    DefaultMQProducer producer = ConnectUtil.initDefaultMQProducer(connectConfig);
-                    TransformChain<ConnectRecord> transformChain = new TransformChain<>(keyValue, plugin);
-                    WorkerSourceTask workerSourceTask = new WorkerSourceTask(connectorName,
-                        (SourceTask) task, keyValue, positionManagementService, recordConverter, producer, workerState, transformChain);
-                    Plugin.compareAndSwapLoaders(currentThreadLoader);
-
-                    Future future = taskExecutor.submit(workerSourceTask);
-                    taskToFutureMap.put(workerSourceTask, future);
-                    this.pendingTasks.put(workerSourceTask, System.currentTimeMillis());
-                } else if (task instanceof SinkTask) {
-                    DefaultMQPullConsumer consumer = ConnectUtil.initDefaultMQPullConsumer(connectConfig);
-                    if (connectConfig.isAutoCreateGroupEnable()) {
-                        ConnectUtil.createSubGroup(connectConfig, consumer.getConsumerGroup());
+                try {
+                    String taskClass = keyValue.getString(RuntimeConfigDefine.TASK_CLASS);
+                    ClassLoader loader = plugin.getPluginClassLoader(taskClass);
+                    final ClassLoader currentThreadLoader = plugin.currentThreadLoader();
+                    Class taskClazz;
+                    boolean isolationFlag = false;
+                    if (loader instanceof PluginClassLoader) {
+                        taskClazz = ((PluginClassLoader) loader).loadClass(taskClass, false);
+                        isolationFlag = true;
+                    } else {
+                        taskClazz = Class.forName(taskClass);
                     }
-                    TransformChain<ConnectRecord> transformChain = new TransformChain<>(keyValue, plugin);
-                    WorkerSinkTask workerSinkTask = new WorkerSinkTask(connectorName,
-                        (SinkTask) task, keyValue, offsetManagementService, recordConverter, consumer, workerState, transformChain);
-                    Plugin.compareAndSwapLoaders(currentThreadLoader);
-                    Future future = taskExecutor.submit(workerSinkTask);
-                    taskToFutureMap.put(workerSinkTask, future);
-                    this.pendingTasks.put(workerSinkTask, System.currentTimeMillis());
+                    final Task task = (Task) taskClazz.getDeclaredConstructor().newInstance();
+                    final String converterClazzName = keyValue.getString(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER);
+                    Converter recordConverter = null;
+                    if (StringUtils.isNotEmpty(converterClazzName)) {
+                        Class converterClazz = Class.forName(converterClazzName);
+                        recordConverter = (Converter) converterClazz.newInstance();
+                    }
+                    if (isolationFlag) {
+                        Plugin.compareAndSwapLoaders(loader);
+                    }
+                    if (task instanceof SourceTask) {
+                        DefaultMQProducer producer = ConnectUtil.initDefaultMQProducer(connectConfig);
+                        TransformChain<ConnectRecord> transformChain = new TransformChain<>(keyValue, plugin);
+                        WorkerSourceTask workerSourceTask = new WorkerSourceTask(connectorName,
+                            (SourceTask) task, keyValue, positionManagementService, recordConverter, producer, workerState, connectStatsManager, connectStatsService, transformChain);
+                        Plugin.compareAndSwapLoaders(currentThreadLoader);
+
+                        Future future = taskExecutor.submit(workerSourceTask);
+                        taskToFutureMap.put(workerSourceTask, future);
+                        this.pendingTasks.put(workerSourceTask, System.currentTimeMillis());
+                    } else if (task instanceof SinkTask) {
+                        log.info("sink task config keyValue is {}", keyValue.getProperties());
+                        DefaultMQPullConsumer consumer = ConnectUtil.initDefaultMQPullConsumer(connectConfig, connectorName, keyValue);
+                        if (connectConfig.isAutoCreateGroupEnable()) {
+                            ConnectUtil.createSubGroup(connectConfig, consumer.getConsumerGroup());
+                        }
+                        TransformChain<ConnectRecord> transformChain = new TransformChain<>(keyValue, plugin);
+                        WorkerSinkTask workerSinkTask = new WorkerSinkTask(connectorName,
+                            (SinkTask) task, keyValue, recordConverter, consumer, workerState, connectStatsManager, connectStatsService, transformChain);
+                        Plugin.compareAndSwapLoaders(currentThreadLoader);
+                        Future future = taskExecutor.submit(workerSinkTask);
+                        taskToFutureMap.put(workerSinkTask, future);
+                        this.pendingTasks.put(workerSinkTask, System.currentTimeMillis());
+                    }
+                } catch (Exception e) {
+                    log.error("start worker task exception. config {}" + JSON.toJSONString(keyValue), e);
                 }
             }
         }
 
-
         //  STEP 3: check all pending state
         for (Map.Entry<Runnable, Long> entry : pendingTasks.entrySet()) {
             Runnable runnable = entry.getKey();
@@ -503,7 +512,7 @@ public class Worker {
         }
 
         //  STEP 5 check errorTasks and stopped tasks
-        for (Runnable runnable: errorTasks) {
+        for (Runnable runnable : errorTasks) {
             WorkerTask workerTask = (WorkerTask) runnable;
             Future future = taskToFutureMap.get(runnable);
 
@@ -515,7 +524,7 @@ public class Worker {
                 }
             } catch (ExecutionException e) {
                 Throwable t = e.getCause();
-            } catch (CancellationException | TimeoutException |  InterruptedException e) {
+            } catch (CancellationException | TimeoutException | InterruptedException e) {
 
             } finally {
                 future.cancel(true);
@@ -527,9 +536,8 @@ public class Worker {
             }
         }
 
-
         //  STEP 5 check errorTasks and stopped tasks
-        for (Runnable runnable: stoppedTasks) {
+        for (Runnable runnable : stoppedTasks) {
             WorkerTask workerTask = (WorkerTask) runnable;
             workerTask.cleanup();
             Future future = taskToFutureMap.get(runnable);
@@ -552,8 +560,7 @@ public class Worker {
             } catch (InterruptedException e) {
                 log.info("[BUG] Stopped Tasks should not throw any exception");
                 e.printStackTrace();
-            }
-            finally {
+            } finally {
                 future.cancel(true);
                 taskToFutureMap.remove(runnable);
                 stoppedTasks.remove(runnable);
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
index 0499534..7b1721b 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
@@ -27,7 +27,6 @@ import io.openmessaging.connector.api.data.RecordPartition;
 import io.openmessaging.connector.api.data.Schema;
 import io.openmessaging.connector.api.errors.ConnectException;
 import io.openmessaging.connector.api.errors.RetriableException;
-import io.openmessaging.connector.api.storage.OffsetStorageReader;
 import io.openmessaging.internal.DefaultKeyValue;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -41,6 +40,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
@@ -60,8 +60,8 @@ import org.apache.rocketmq.connect.runtime.common.QueueState;
 import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
 import org.apache.rocketmq.connect.runtime.config.SinkConnectorConfig;
 import org.apache.rocketmq.connect.runtime.converter.RocketMQConverter;
-import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
-import org.apache.rocketmq.connect.runtime.store.PositionStorageReaderImpl;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
 import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.slf4j.Logger;
@@ -115,13 +115,6 @@ public class WorkerSinkTask implements WorkerTask {
      */
     private final DefaultMQPullConsumer consumer;
 
-    private final PositionManagementService offsetManagementService;
-
-    /**
-     *
-     */
-    private final OffsetStorageReader offsetStorageReader;
-
     /**
      * A converter to parse sink data entry to object.
      */
@@ -152,6 +145,10 @@ public class WorkerSinkTask implements WorkerTask {
 
     private final AtomicReference<WorkerState> workerState;
 
+    private final ConnectStatsManager connectStatsManager;
+
+    private final ConnectStatsService connectStatsService;
+
     private final CountDownLatch stopPullMsgLatch;
 
     private WorkerSinkTaskContext sinkTaskContext;
@@ -163,25 +160,38 @@ public class WorkerSinkTask implements WorkerTask {
     public static final String TOPIC = "topic";
     public static final String QUEUE_OFFSET = "queueOffset";
 
+    private static final Set<String> MQ_SYS_KEYS = new HashSet<String>() {
+        {
+            add("MIN_OFFSET");
+            add("TRACE_ON");
+            add("MAX_OFFSET");
+            add("MSG_REGION");
+            add("UNIQ_KEY");
+            add("WAIT");
+            add("TAGS");
+        }
+    };
+
     public WorkerSinkTask(String connectorName,
         SinkTask sinkTask,
         ConnectKeyValue taskConfig,
-        PositionManagementService offsetManagementService,
         Converter recordConverter,
         DefaultMQPullConsumer consumer,
         AtomicReference<WorkerState> workerState,
+        ConnectStatsManager connectStatsManager,
+        ConnectStatsService connectStatsService,
         TransformChain<ConnectRecord> transformChain) {
         this.connectorName = connectorName;
         this.sinkTask = sinkTask;
         this.taskConfig = taskConfig;
         this.consumer = consumer;
-        this.offsetManagementService = offsetManagementService;
-        this.offsetStorageReader = new PositionStorageReaderImpl(offsetManagementService);
         this.recordConverter = recordConverter;
         this.messageQueuesOffsetMap = new ConcurrentHashMap<>(256);
         this.messageQueuesStateMap = new ConcurrentHashMap<>(256);
         this.state = new AtomicReference<>(WorkerTaskState.NEW);
         this.workerState = workerState;
+        this.connectStatsManager = connectStatsManager;
+        this.connectStatsService = connectStatsService;
         this.stopPullMsgLatch = new CountDownLatch(1);
         this.transformChain = transformChain;
     }
@@ -210,13 +220,19 @@ public class WorkerSinkTask implements WorkerTask {
                     setQueueOffset();
                     pullMessageFromQueues();
                 } catch (RetriableException e) {
+                    connectStatsManager.incSinkRecordPutTotalFailNums();
+                    connectStatsManager.incSinkRecordPutFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
                     log.error("Sink task RetriableException exception", e);
                 } catch (InterruptedException e) {
+                    connectStatsManager.incSinkRecordPutTotalFailNums();
+                    connectStatsManager.incSinkRecordPutFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
                     log.error("Sink task InterruptedException exception", e);
                     throw e;
                 } catch (Throwable e) {
                     state.set(WorkerTaskState.ERROR);
-                    log.error("sink task {}, pull message MQClientException, Error {} ", this, e.getMessage(), e);
+                    log.error(" sink task {},pull message MQClientException, Error {} ", this, e.getMessage(), e);
+                    connectStatsManager.incSinkRecordPutTotalFailNums();
+                    connectStatsManager.incSinkRecordPutFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
                 }
             }
 
@@ -230,7 +246,7 @@ public class WorkerSinkTask implements WorkerTask {
         } finally {
             if (consumer != null) {
                 consumer.shutdown();
-                log.info("Sink task consumer shutdown.");
+                log.info("Sink task consumer shutdown. config:{}", JSON.toJSONString(taskConfig));
             }
         }
     }
@@ -350,26 +366,52 @@ public class WorkerSinkTask implements WorkerTask {
                 break;
             }
             PullResult pullResult = null;
+            final long beginPullMsgTimestamp = System.currentTimeMillis();
             try {
                 shouldStopPullMsg();
                 pullResult = consumer.pullBlockIfNotFound(entry.getKey(), "*", entry.getValue(), MAX_MESSAGE_NUM);
                 pullMsgErrorCount = 0;
             } catch (MQClientException e) {
                 pullMsgErrorCount++;
-                log.error(" sink task message queue {}, offset {}, taskconfig {},pull message MQClientException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
+                log.error(" sink task message queue {}, offset {}, taskconfig {},pull message MQClientException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
+                connectStatsManager.incSinkRecordReadTotalFailNums();
+                connectStatsManager.incSinkRecordReadFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
+                long errorPullRT = System.currentTimeMillis() - beginPullMsgTimestamp;
+                connectStatsManager.incSinkRecordReadTotalFailRT(errorPullRT);
+                connectStatsManager.incSinkRecordReadFailRT(taskConfig.getString(RuntimeConfigDefine.TASK_ID), errorPullRT);
             } catch (RemotingException e) {
                 pullMsgErrorCount++;
-                log.error(" sink task message queue {}, offset {}, taskconfig {},pull message RemotingException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
+                log.error(" sink task message queue {}, offset {}, taskconfig {},pull message RemotingException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
+                connectStatsManager.incSinkRecordReadTotalFailNums();
+                connectStatsManager.incSinkRecordReadFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
+                long errorPullRT = System.currentTimeMillis() - beginPullMsgTimestamp;
+                connectStatsManager.incSinkRecordReadTotalFailRT(errorPullRT);
+                connectStatsManager.incSinkRecordReadFailRT(taskConfig.getString(RuntimeConfigDefine.TASK_ID), errorPullRT);
             } catch (MQBrokerException e) {
                 pullMsgErrorCount++;
-                log.error(" sink task message queue {}, offset {}, taskconfig {},pull message MQBrokerException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
+                log.error(" sink task message queue {}, offset {}, taskconfig {},pull message MQBrokerException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
+                connectStatsManager.incSinkRecordReadTotalFailNums();
+                connectStatsManager.incSinkRecordReadFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
+                long errorPullRT = System.currentTimeMillis() - beginPullMsgTimestamp;
+                connectStatsManager.incSinkRecordReadTotalFailRT(errorPullRT);
+                connectStatsManager.incSinkRecordReadFailRT(taskConfig.getString(RuntimeConfigDefine.TASK_ID), errorPullRT);
             } catch (InterruptedException e) {
                 pullMsgErrorCount++;
-                log.error(" sink task message queue {}, offset {}, taskconfig {},pull message InterruptedException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
+                log.error(" sink task message queue {}, offset {}, taskconfig {},pull message InterruptedException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
+                connectStatsManager.incSinkRecordReadTotalFailNums();
+                connectStatsManager.incSinkRecordReadFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
+                long errorPullRT = System.currentTimeMillis() - beginPullMsgTimestamp;
+                connectStatsManager.incSinkRecordReadTotalFailRT(errorPullRT);
+                connectStatsManager.incSinkRecordReadFailRT(taskConfig.getString(RuntimeConfigDefine.TASK_ID), errorPullRT);
                 throw e;
             } catch (Throwable e) {
                 pullMsgErrorCount++;
-                log.error(" sink task message queue {}, offset {}, taskconfig {},pull message Throwable, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), e);
+                log.error(" sink task message queue {}, offset {}, taskconfig {},pull message Throwable, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), e);
+                connectStatsManager.incSinkRecordReadTotalFailNums();
+                connectStatsManager.incSinkRecordReadFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
+                long errorPullRT = System.currentTimeMillis() - beginPullMsgTimestamp;
+                connectStatsManager.incSinkRecordReadTotalFailRT(errorPullRT);
+                connectStatsManager.incSinkRecordReadFailRT(taskConfig.getString(RuntimeConfigDefine.TASK_ID), errorPullRT);
                 throw e;
             }
             long currentTime = System.currentTimeMillis();
@@ -379,11 +421,16 @@ public class WorkerSinkTask implements WorkerTask {
             if (null != pullResult && pullResult.getPullStatus().equals(PullStatus.FOUND)) {
                 this.incPullTPS(entry.getKey().getTopic(), pullResult.getMsgFoundList().size());
                 messages = pullResult.getMsgFoundList();
+                connectStatsManager.incSinkRecordReadTotalNums();
+                connectStatsManager.incSinkRecordReadNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID), messages.size());
+                long pullRT = System.currentTimeMillis() - beginPullMsgTimestamp;
+                connectStatsManager.incSinkRecordReadTotalRT(pullRT);
+                connectStatsManager.incSinkRecordReadRT(taskConfig.getString(RuntimeConfigDefine.TASK_ID), pullRT);
                 receiveMessages(messages);
                 if (messageQueuesOffsetMap.containsKey(entry.getKey())) {
                     messageQueuesOffsetMap.put(entry.getKey(), pullResult.getNextBeginOffset());
                 } else {
-                    log.warn("The consumer may have load balancing, and the current task does not process the message queue,messageQueuesOffsetMap {}, messageQueue {}", JSON.toJSONString(messageQueuesOffsetMap), JSON.toJSONString(entry.getKey()));
+                    log.warn("The consumer may have load balancing, and the current task does not process the message queue,messageQueuesOffsetMap {}, messageQueue {}", JSON.toJSONString(messageQueuesOffsetMap), JSON.toJSONString(entry.getKey()));
                 }
                 try {
                     consumer.updateConsumeOffset(entry.getKey(), pullResult.getNextBeginOffset());
@@ -401,6 +448,11 @@ public class WorkerSinkTask implements WorkerTask {
             } else {
                 log.info("no new message, pullResult {}, message queue {}, pull offset {}", JSON.toJSONString(pullResult), JSON.toJSONString(entry.getKey()), entry.getValue());
             }
+
+            AtomicLong atomicLong = connectStatsService.singleSinkTaskTimesTotal(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
+            if (null != atomicLong) {
+                atomicLong.addAndGet(org.apache.commons.collections4.CollectionUtils.isEmpty(messages) ? 0 : messages.size());
+            }
         }
     }
 
@@ -516,7 +568,13 @@ public class WorkerSinkTask implements WorkerTask {
             KeyValue keyValue = new DefaultKeyValue();
             if (MapUtils.isNotEmpty(properties)) {
                 for (Map.Entry<String, String> entry : properties.entrySet()) {
-                    keyValue.put(entry.getKey(), entry.getValue());
+                    if (MQ_SYS_KEYS.contains(entry.getKey())) {
+                        keyValue.put("MQ-SYS-" + entry.getKey(), entry.getValue());
+                    } else if (entry.getKey().startsWith("connect-ext-")){
+                        keyValue.put(entry.getKey().replaceAll("connect-ext-", ""), entry.getValue());
+                    } else {
+                        keyValue.put(entry.getKey(), entry.getValue());
+                    }
                 }
             }
             sinkDataEntry.addExtension(keyValue);
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index 5da46b7..36a10fd 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -19,6 +19,7 @@
 package org.apache.rocketmq.connect.runtime.connectorwrapper;
 
 import com.alibaba.fastjson.JSON;
+import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.task.source.SourceTask;
 import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
 import io.openmessaging.connector.api.data.ConnectRecord;
@@ -26,14 +27,15 @@ import io.openmessaging.connector.api.data.Converter;
 import io.openmessaging.connector.api.data.RecordOffset;
 import io.openmessaging.connector.api.data.RecordPartition;
 import io.openmessaging.connector.api.data.RecordPosition;
-import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.errors.ConnectException;
 import io.openmessaging.connector.api.errors.RetriableException;
 import io.openmessaging.connector.api.storage.OffsetStorageReader;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -47,6 +49,8 @@ import org.apache.rocketmq.connect.runtime.common.LoggerName;
 import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
 import org.apache.rocketmq.connect.runtime.converter.RocketMQConverter;
 import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
 import org.apache.rocketmq.connect.runtime.store.PositionStorageReaderImpl;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.slf4j.Logger;
@@ -100,6 +104,10 @@ public class WorkerSourceTask implements WorkerTask {
 
     private final AtomicReference<WorkerState> workerState;
 
+    private ConnectStatsManager connectStatsManager;
+
+    private ConnectStatsService connectStatsService;
+
     private List<ConnectRecord> toSendRecord;
 
     private TransformChain<ConnectRecord> transformChain;
@@ -111,6 +119,8 @@ public class WorkerSourceTask implements WorkerTask {
         Converter recordConverter,
         DefaultMQProducer producer,
         AtomicReference<WorkerState> workerState,
+        ConnectStatsManager connectStatsManager,
+        ConnectStatsService connectStatsService,
         TransformChain<ConnectRecord> transformChain) {
         this.connectorName = connectorName;
         this.sourceTask = sourceTask;
@@ -121,6 +131,8 @@ public class WorkerSourceTask implements WorkerTask {
         this.recordConverter = recordConverter;
         this.state = new AtomicReference<>(WorkerTaskState.NEW);
         this.workerState = workerState;
+        this.connectStatsManager = connectStatsManager;
+        this.connectStatsService = connectStatsService;
         this.transformChain = transformChain;
     }
 
@@ -155,22 +167,31 @@ public class WorkerSourceTask implements WorkerTask {
                     try {
                         toSendRecord = poll();
                         if (null != toSendRecord && toSendRecord.size() > 0) {
+                            connectStatsManager.incSourceRecordPollTotalNums();
+                            connectStatsManager.incSourceRecordPollNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
                             sendRecord();
                         }
                     } catch (RetriableException e) {
+                        connectStatsManager.incSourceRecordPollTotalFailNums();
+                        connectStatsManager.incSourceRecordPollFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
                         log.error("Source task RetriableException exception", e);
                     } catch (Exception e) {
+                        connectStatsManager.incSourceRecordPollTotalFailNums();
+                        connectStatsManager.incSourceRecordPollFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
                         log.error("Source task RetriableException exception", e);
                         state.set(WorkerTaskState.ERROR);
                     }
                 }
-
+                AtomicLong atomicLong = connectStatsService.singleSourceTaskTimesTotal(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
+                if (null != atomicLong) {
+                    atomicLong.addAndGet(toSendRecord == null ? 0 : toSendRecord.size());
+                }
             }
             sourceTask.stop();
             state.compareAndSet(WorkerTaskState.STOPPING, WorkerTaskState.STOPPED);
             log.info("Source task stop, config:{}", JSON.toJSONString(taskConfig));
         } catch (Exception e) {
-            log.error("Run task failed., task config {}", JSON.toJSONString(taskConfig), e);
+            log.error("Run task failed., task config: " +  JSON.toJSONString(taskConfig), e);
             state.set(WorkerTaskState.ERROR);
         } finally {
             if (producer != null) {
@@ -184,6 +205,9 @@ public class WorkerSourceTask implements WorkerTask {
         List<ConnectRecord> connectRecordList = null;
         try {
             connectRecordList = sourceTask.poll();
+            if (CollectionUtils.isEmpty(connectRecordList)) {
+                return null;
+            }
             List<ConnectRecord> connectRecordList1 = new ArrayList<>(32);
             for (ConnectRecord connectRecord : connectRecordList) {
                 ConnectRecord connectRecord1 = this.transformChain.doTransforms(connectRecord);
@@ -249,6 +273,9 @@ public class WorkerSourceTask implements WorkerTask {
                 }
                 topic = (String) o;
             }
+            if (StringUtils.isBlank(topic)) {
+                throw new ConnectException("source connect lack of topic config");
+            }
             sourceMessage.setTopic(topic);
             if (null == recordConverter || recordConverter instanceof RocketMQConverter) {
                 putExtendMsgProperty(sourceDataEntry, sourceMessage, topic);
@@ -272,7 +299,9 @@ public class WorkerSourceTask implements WorkerTask {
             try {
                 producer.send(sourceMessage, new SendCallback() {
                     @Override public void onSuccess(org.apache.rocketmq.client.producer.SendResult result) {
-                        log.info("Successful send message to RocketMQ:{}", result.getMsgId());
+                        log.info("Successful send message to RocketMQ:{}, Topic {}", result.getMsgId(), result.getMessageQueue().getTopic());
+                        connectStatsManager.incSourceRecordWriteTotalNums();
+                        connectStatsManager.incSourceRecordWriteNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
                         RecordPartition partition = position.getPartition();
                         try {
                             if (null != partition && null != position) {
@@ -288,14 +317,22 @@ public class WorkerSourceTask implements WorkerTask {
 
                     @Override public void onException(Throwable throwable) {
                         log.error("Source task send record failed ,error msg {}. message {}", throwable.getMessage(), JSON.toJSONString(sourceMessage), throwable);
+                        connectStatsManager.incSourceRecordWriteTotalFailNums();
+                        connectStatsManager.incSourceRecordWriteFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
                     }
                 });
             } catch (MQClientException e) {
                 log.error("Send message MQClientException. message: {}, error info: {}.", sourceMessage, e);
+                connectStatsManager.incSourceRecordWriteTotalFailNums();
+                connectStatsManager.incSourceRecordWriteFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
             } catch (RemotingException e) {
                 log.error("Send message RemotingException. message: {}, error info: {}.", sourceMessage, e);
+                connectStatsManager.incSourceRecordWriteTotalFailNums();
+                connectStatsManager.incSourceRecordWriteFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
             } catch (InterruptedException e) {
                 log.error("Send message InterruptedException. message: {}, error info: {}.", sourceMessage, e);
+                connectStatsManager.incSourceRecordWriteTotalFailNums();
+                connectStatsManager.incSourceRecordWriteFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
                 throw e;
             }
         }
@@ -303,34 +340,18 @@ public class WorkerSourceTask implements WorkerTask {
     }
 
     private void putExtendMsgProperty(ConnectRecord sourceDataEntry, Message sourceMessage, String topic) {
-        String shardingKey = sourceDataEntry.getExtension(RuntimeConfigDefine.CONNECT_SHARDINGKEY);
-        if (StringUtils.isNotEmpty(shardingKey)) {
-            MessageAccessor.putProperty(sourceMessage, RuntimeConfigDefine.CONNECT_SHARDINGKEY, shardingKey);
-        }
-        if (StringUtils.isNotEmpty(topic)) {
-            MessageAccessor.putProperty(sourceMessage, RuntimeConfigDefine.CONNECT_TOPICNAME, topic);
-        }
-        String sourcePartition = sourceDataEntry.getExtension(RuntimeConfigDefine.CONNECT_SOURCE_PARTITION);
-        if (StringUtils.isNotEmpty(sourcePartition)) {
-            MessageAccessor.putProperty(sourceMessage, RuntimeConfigDefine.CONNECT_SOURCE_PARTITION, sourcePartition);
-        }
-        String sourcePosition = sourceDataEntry.getExtension(RuntimeConfigDefine.CONNECT_SOURCE_POSITION);
-        if (StringUtils.isNotEmpty(sourcePosition)) {
-            MessageAccessor.putProperty(sourceMessage, RuntimeConfigDefine.CONNECT_SOURCE_POSITION, sourcePosition);
-        }
-        String entryType = sourceDataEntry.getExtension(RuntimeConfigDefine.CONNECT_ENTRYTYPE);
-        if (StringUtils.isNotEmpty(entryType)) {
-            MessageAccessor.putProperty(sourceMessage, RuntimeConfigDefine.CONNECT_ENTRYTYPE, entryType);
+        KeyValue extensionKeyValues = sourceDataEntry.getExtensions();
+        if (null == extensionKeyValues) {
+            log.info("extension key value is null.");
+            return;
         }
-        Long timestamp = sourceDataEntry.getTimestamp();
-        Optional<Long> otimestamp = Optional.ofNullable(timestamp);
-        if (otimestamp.isPresent()) {
-            MessageAccessor.putProperty(sourceMessage, RuntimeConfigDefine.CONNECT_TIMESTAMP, otimestamp.get().toString());
+        Set<String> keySet = extensionKeyValues.keySet();
+        if (CollectionUtils.isEmpty(keySet)) {
+            log.info("extension keySet null.");
+            return;
         }
-        Schema schema = sourceDataEntry.getSchema();
-        Optional<Schema> oschema = Optional.ofNullable(schema);
-        if (oschema.isPresent()) {
-            MessageAccessor.putProperty(sourceMessage, RuntimeConfigDefine.CONNECT_SCHEMA, JSON.toJSONString(oschema.get()));
+        for (String key : keySet) {
+            MessageAccessor.putProperty(sourceMessage, "connect-ext-" + key, extensionKeyValues.getString(key));
         }
     }
 
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
index b9a7fe3..b2a3a5b 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
@@ -57,6 +57,7 @@ public class RestHandler {
         app.get("/connectors/enableAll", this::handleEnableAllConnector);
         app.get("/connectors/disableAll", this::handleDisableAllConnector);
         app.get("/connectors/:connectorName", this::handleCreateConnector);
+        app.post("/connectors/:connectorName", this::handleCreateConnector);
         app.get("/connectors/:connectorName/config", this::handleQueryConnectorConfig);
         app.get("/connectors/:connectorName/status", this::handleQueryConnectorStatus);
         app.get("/connectors/:connectorName/stop", this::handleStopConnector);
@@ -124,7 +125,12 @@ public class RestHandler {
 
     private void handleCreateConnector(Context context) {
         String connectorName = context.pathParam("connectorName");
-        String arg = context.req.getParameter("config");
+        String arg;
+        if (context.req.getMethod().equals("POST")) {
+            arg = context.body();
+        } else {
+            arg = context.req.getParameter("config");
+        }
         if (arg == null) {
             context.result("failed! query param 'config' is required ");
             return;
@@ -145,7 +151,7 @@ public class RestHandler {
             }
         } catch (Exception e) {
             log.error("Handle createConnector error .", e);
-            context.result("failed");
+            context.result("failed:" + e.getMessage());
         }
     }
 
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
index 653e6e9..354fca8 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
@@ -197,6 +197,12 @@ public class ConfigManagementServiceImpl implements ConfigManagementService {
 
             newKeyValue.put(RuntimeConfigDefine.CONNECT_TOPICNAME, configs.getString(RuntimeConfigDefine.CONNECT_TOPICNAME));
             newKeyValue.put(RuntimeConfigDefine.CONNECT_TOPICNAMES, configs.getString(RuntimeConfigDefine.CONNECT_TOPICNAMES));
+            Set<String> connectConfigKeySet = configs.keySet();
+            for (String connectConfigKey : connectConfigKeySet) {
+                if (connectConfigKey.startsWith(RuntimeConfigDefine.TRANSFORMS)) {
+                    newKeyValue.put(connectConfigKey, configs.getString(connectConfigKey));
+                }
+            }
             converterdConfigs.add(newKeyValue);
         }
         putTaskConfigs(connectorName, converterdConfigs);
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/TaskPositionCommitService.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/TaskPositionCommitService.java
index 9b8764d..72fa0b9 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/TaskPositionCommitService.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/TaskPositionCommitService.java
@@ -36,15 +36,10 @@ public class TaskPositionCommitService extends ServiceThread {
     private final PositionManagementService positionManagementService;
 
 
-    private final PositionManagementService offsetManagementService;
-
-
     public TaskPositionCommitService(Worker worker,
-        PositionManagementService positionManagementService,
-        PositionManagementService offsetManagementService) {
+        PositionManagementService positionManagementService) {
         this.worker = worker;
         this.positionManagementService = positionManagementService;
-        this.offsetManagementService = offsetManagementService;
     }
 
     @Override
@@ -67,8 +62,6 @@ public class TaskPositionCommitService extends ServiceThread {
 
     public void commitTaskPosition() {
         positionManagementService.persist();
-        offsetManagementService.persist();
         positionManagementService.synchronize();
-        offsetManagementService.synchronize();
     }
 }
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStats.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStats.java
new file mode 100644
index 0000000..ed462cb
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStats.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.rocketmq.connect.runtime.stats;
+
+import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+public class ConnectStats {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.ROCKETMQ_CONNECT_STATS);
+
+    private final ConnectStatsService connectStatsService;
+
+    private volatile long msgPutTotalYesterdayMorning;
+
+    private volatile long msgPutTotalTodayMorning;
+
+    private volatile long msgGetTotalYesterdayMorning;
+
+    private volatile long msgGetTotalTodayMorning;
+
+    public ConnectStats(ConnectStatsService connectStatsService) {
+        this.connectStatsService = connectStatsService;
+    }
+
+    public void record() {
+        this.msgPutTotalYesterdayMorning = this.msgPutTotalTodayMorning;
+        this.msgGetTotalYesterdayMorning = this.msgGetTotalTodayMorning;
+
+        log.info("yesterday put message total: {}", msgPutTotalTodayMorning - msgPutTotalYesterdayMorning);
+        log.info("yesterday get message total: {}", msgGetTotalTodayMorning - msgGetTotalYesterdayMorning);
+    }
+
+    public long getMsgPutTotalYesterdayMorning() {
+        return msgPutTotalYesterdayMorning;
+    }
+
+    public void setMsgPutTotalYesterdayMorning(long msgPutTotalYesterdayMorning) {
+        this.msgPutTotalYesterdayMorning = msgPutTotalYesterdayMorning;
+    }
+
+    public long getMsgPutTotalTodayMorning() {
+        return msgPutTotalTodayMorning;
+    }
+
+    public void setMsgPutTotalTodayMorning(long msgPutTotalTodayMorning) {
+        this.msgPutTotalTodayMorning = msgPutTotalTodayMorning;
+    }
+
+    public long getMsgGetTotalYesterdayMorning() {
+        return msgGetTotalYesterdayMorning;
+    }
+
+    public void setMsgGetTotalYesterdayMorning(long msgGetTotalYesterdayMorning) {
+        this.msgGetTotalYesterdayMorning = msgGetTotalYesterdayMorning;
+    }
+
+    public long getMsgGetTotalTodayMorning() {
+        return msgGetTotalTodayMorning;
+    }
+
+    public void setMsgGetTotalTodayMorning(long msgGetTotalTodayMorning) {
+        this.msgGetTotalTodayMorning = msgGetTotalTodayMorning;
+    }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsManager.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsManager.java
new file mode 100644
index 0000000..b7a03e9
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsManager.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.rocketmq.connect.runtime.stats;
+
+import java.util.HashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.stats.StatsItem;
+import org.apache.rocketmq.common.stats.StatsItemSet;
+import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+import static org.apache.rocketmq.connect.runtime.common.LoggerName.ROCKETMQ_CONNECT_STATS;
+
+public class ConnectStatsManager {
+
+    public static final String SOURCE_RECORD_WRITE_NUMS = "SOURCE_RECORD_WRITE_NUMS";
+    public static final String SOURCE_RECORD_WRITE_TOTAL_NUMS = "SOURCE_RECORD_WRITE_TOTAL_NUMS";
+
+    public static final String SOURCE_RECORD_WRITE_FAIL_NUMS = "SOURCE_RECORD_WRITE_FAIL_NUMS";
+    public static final String SOURCE_RECORD_WRITE_TOTAL_FAIL_NUMS = "SOURCE_RECORD_WRITE_TOTAL_FAIL_NUMS";
+
+    public static final String SOURCE_RECORD_POLL_NUMS = "SOURCE_RECORD_POLL_NUMS";
+    public static final String SOURCE_RECORD_POLL_TOTAL_NUMS = "SOURCE_RECORD_POLL_TOTAL_NUMS";
+
+    public static final String SOURCE_RECORD_POLL_FAIL_NUMS = "SOURCE_RECORD_POLL_FAIL_NUMS";
+    public static final String SOURCE_RECORD_POLL_FAIL_TOTAL_NUMS = "SOURCE_RECORD_POLL_FAIL_TOTAL_NUMS";
+
+    public static final String SINK_RECORD_READ_NUMS = "SINK_RECORD_READ_NUMS";
+    public static final String SINK_RECORD_READ_TOTAL_NUMS = "SINK_RECORD_READ_TOTAL_NUMS";
+
+    public static final String SINK_RECORD_READ_RT = "SINK_RECORD_READ_RT";
+    public static final String SINK_RECORD_READ_TOTAL_RT = "SINK_RECORD_READ_TOTAL_RT";
+
+    public static final String SINK_RECORD_READ_FAIL_NUMS = "SINK_RECORD_READ_FAIL_NUMS";
+    public static final String SINK_RECORD_READ_TOTAL_FAIL_NUMS = "SINK_RECORD_READ_TOTAL_FAIL_NUMS";
+
+    public static final String SINK_RECORD_READ_FAIL_RT = "SINK_RECORD_READ_FAIL_RT";
+    public static final String SINK_RECORD_READ_TOTAL_FAIL_RT = "SINK_RECORD_READ_TOTAL_FAIL_RT";
+
+    public static final String SINK_RECORD_PUT_NUMS = "SINK_RECORD_PUT_NUMS";
+    public static final String SINK_RECORD_PUT_TOTAL_NUMS = "SINK_RECORD_PUT_TOTAL_NUMS";
+
+    public static final String SINK_RECORD_PUT_RT = "SINK_RECORD_PUT_RT";
+    public static final String SINK_RECORD_PUT_TOTAL_RT = "SINK_RECORD_PUT_TOTAL_RT";
+
+    public static final String SINK_RECORD_PUT_FAIL_NUMS = "SINK_RECORD_PUT_FAIL_NUMS";
+    public static final String SINK_RECORD_PUT_TOTAL_FAIL_NUMS = "SINK_RECORD_PUT_TOTAL_FAIL_NUMS";
+
+    public static final String SINK_RECORD_PUT_FAIL_RT = "SINK_RECORD_PUT_FAIL_RT";
+    public static final String SINK_RECORD_PUT_TOTAL_FAIL_RT = "SINK_RECORD_PUT_TOTAL_FAIL_RT";
+
+    /**
+     * read disk follow stats
+     */
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(ROCKETMQ_CONNECT_STATS);
+
+
+    private static final InternalLogger COMMERCIAL_LOG = InternalLoggerFactory.getLogger(
+        LoggerName.COMMERCIAL_LOGGER_NAME);
+    private final ScheduledExecutorService scheduledExecutorService =
+        ThreadUtils.newSingleThreadScheduledExecutor("ConnectStatsThread", true);
+    private final ScheduledExecutorService commercialExecutor =
+        ThreadUtils.newSingleThreadScheduledExecutor("CommercialStatsThread", true);
+    private final ScheduledExecutorService accountExecutor =
+        ThreadUtils.newSingleThreadScheduledExecutor("AccountStatsThread", true);
+
+    private final HashMap<String, StatsItemSet> statsTable = new HashMap<String, StatsItemSet>();
+    private final String worker;
+    private ConnectConfig connectConfig;
+
+    public ConnectStatsManager(ConnectConfig connectConfig) {
+        this.connectConfig = connectConfig;
+        this.worker = connectConfig.getWorkerId();
+        init();
+    }
+
+    public void init() {
+        this.statsTable.put(SOURCE_RECORD_WRITE_NUMS, new StatsItemSet(SOURCE_RECORD_WRITE_NUMS, this.scheduledExecutorService, log));
+        this.statsTable.put(SOURCE_RECORD_WRITE_TOTAL_NUMS, new StatsItemSet(SOURCE_RECORD_WRITE_TOTAL_NUMS, this.scheduledExecutorService, log));
+        this.statsTable.put(SOURCE_RECORD_WRITE_FAIL_NUMS, new StatsItemSet(SOURCE_RECORD_WRITE_FAIL_NUMS, this.scheduledExecutorService, log));
+        this.statsTable.put(SOURCE_RECORD_WRITE_TOTAL_FAIL_NUMS, new StatsItemSet(SOURCE_RECORD_WRITE_TOTAL_FAIL_NUMS, this.scheduledExecutorService, log));
+        this.statsTable.put(SOURCE_RECORD_POLL_NUMS, new StatsItemSet(SOURCE_RECORD_POLL_NUMS, this.scheduledExecutorService, log));
+        this.statsTable.put(SOURCE_RECORD_POLL_TOTAL_NUMS, new StatsItemSet(SOURCE_RECORD_POLL_TOTAL_NUMS, this.scheduledExecutorService, log));
+        this.statsTable.put(SOURCE_RECORD_POLL_FAIL_NUMS, new StatsItemSet(SOURCE_RECORD_POLL_FAIL_NUMS, this.scheduledExecutorService, log));
+        this.statsTable.put(SOURCE_RECORD_POLL_FAIL_TOTAL_NUMS, new StatsItemSet(SOURCE_RECORD_POLL_FAIL_TOTAL_NUMS, this.scheduledExecutorService, log));
+        this.statsTable.put(SINK_RECORD_READ_NUMS, new StatsItemSet(SINK_RECORD_READ_NUMS, this.scheduledExecutorService, log));
+        this.statsTable.put(SINK_RECORD_READ_TOTAL_NUMS, new StatsItemSet(SINK_RECORD_READ_TOTAL_NUMS, this.scheduledExecutorService, log));
+        this.statsTable.put(SINK_RECORD_READ_FAIL_NUMS, new StatsItemSet(SINK_RECORD_READ_FAIL_NUMS, this.scheduledExecutorService, log));
+        this.statsTable.put(SINK_RECORD_READ_TOTAL_FAIL_NUMS, new StatsItemSet(SINK_RECORD_READ_TOTAL_FAIL_NUMS, this.scheduledExecutorService, log));
+        this.statsTable.put(SINK_RECORD_PUT_NUMS, new StatsItemSet(SINK_RECORD_PUT_NUMS, this.scheduledExecutorService, log));
+        this.statsTable.put(SINK_RECORD_PUT_TOTAL_NUMS, new StatsItemSet(SINK_RECORD_PUT_TOTAL_NUMS, this.scheduledExecutorService, log));
+        this.statsTable.put(SINK_RECORD_PUT_FAIL_NUMS, new StatsItemSet(SINK_RECORD_PUT_FAIL_NUMS, this.scheduledExecutorService, log));
+        this.statsTable.put(SINK_RECORD_PUT_TOTAL_FAIL_NUMS, new StatsItemSet(SINK_RECORD_PUT_TOTAL_FAIL_NUMS, this.scheduledExecutorService, log));
+
+        this.statsTable.put(SINK_RECORD_READ_RT, new StatsItemSet(SINK_RECORD_READ_RT, this.scheduledExecutorService, log));
+        this.statsTable.put(SINK_RECORD_READ_TOTAL_RT, new StatsItemSet(SINK_RECORD_READ_TOTAL_RT, this.scheduledExecutorService, log));
+        this.statsTable.put(SINK_RECORD_READ_FAIL_RT, new StatsItemSet(SINK_RECORD_READ_FAIL_RT, this.scheduledExecutorService, log));
+        this.statsTable.put(SINK_RECORD_READ_TOTAL_FAIL_RT, new StatsItemSet(SINK_RECORD_READ_TOTAL_FAIL_RT, this.scheduledExecutorService, log));
+        this.statsTable.put(SINK_RECORD_PUT_RT, new StatsItemSet(SINK_RECORD_PUT_RT, this.scheduledExecutorService, log));
+        this.statsTable.put(SINK_RECORD_PUT_TOTAL_RT, new StatsItemSet(SINK_RECORD_PUT_TOTAL_RT, this.scheduledExecutorService, log));
+        this.statsTable.put(SINK_RECORD_PUT_FAIL_RT, new StatsItemSet(SINK_RECORD_PUT_FAIL_RT, this.scheduledExecutorService, log));
+        this.statsTable.put(SINK_RECORD_PUT_TOTAL_FAIL_RT, new StatsItemSet(SINK_RECORD_PUT_TOTAL_FAIL_RT, this.scheduledExecutorService, log));
+    }
+
+    public void start() {
+    }
+
+    public void shutdown() {
+        this.scheduledExecutorService.shutdown();
+        this.commercialExecutor.shutdown();
+    }
+
+    public StatsItem getStatsItem(final String statsName, final String statsKey) {
+        try {
+            return this.statsTable.get(statsName).getStatsItem(statsKey);
+        } catch (Exception e) {
+        }
+
+        return null;
+    }
+
+    public void incSourceRecordPollTotalNums() {
+        this.statsTable.get(SOURCE_RECORD_POLL_TOTAL_NUMS).addValue(worker, 1, 1);
+
+    }
+
+    public void incSourceRecordPollNums(String taskId) {
+        if (StringUtils.isBlank(taskId)) {
+            return;
+        }
+        this.statsTable.get(SOURCE_RECORD_POLL_NUMS).addValue(taskId, 1, 1);
+    }
+
+    public void incSourceRecordPollTotalFailNums() {
+        this.statsTable.get(SOURCE_RECORD_POLL_FAIL_TOTAL_NUMS).addValue(worker, 1, 1);
+
+    }
+
+    public void incSourceRecordPollFailNums(String taskId) {
+        if (StringUtils.isBlank(taskId)) {
+            return;
+        }
+        this.statsTable.get(SOURCE_RECORD_POLL_FAIL_NUMS).addValue(taskId, 1, 1);
+    }
+
+    public void incSourceRecordWriteTotalNums() {
+        this.statsTable.get(SOURCE_RECORD_WRITE_TOTAL_NUMS).addValue(worker, 1, 1);
+    }
+
+    public void incSourceRecordWriteNums(String taskId) {
+        if (StringUtils.isBlank(taskId)) {
+            return;
+        }
+        this.statsTable.get(SOURCE_RECORD_WRITE_NUMS).addValue(taskId, 1, 1);
+    }
+
+    public void incSourceRecordWriteTotalFailNums() {
+        this.statsTable.get(SOURCE_RECORD_WRITE_TOTAL_FAIL_NUMS).addValue(worker, 1, 1);
+    }
+
+    public void incSourceRecordWriteFailNums(String taskId) {
+        if (StringUtils.isBlank(taskId)) {
+            return;
+        }
+        this.statsTable.get(SOURCE_RECORD_WRITE_FAIL_NUMS).addValue(taskId, 1, 1);
+    }
+
+    public void incSinkRecordPutTotalFailNums() {
+        this.statsTable.get(SINK_RECORD_PUT_TOTAL_FAIL_NUMS).addValue(worker, 1, 1);
+    }
+
+    public void incSinkRecordPutFailNums(String taskId) {
+        if (StringUtils.isBlank(taskId)) {
+            return;
+        }
+        this.statsTable.get(SINK_RECORD_PUT_FAIL_NUMS).addValue(taskId, 1, 1);
+    }
+
+    public void incSinkRecordReadTotalFailNums() {
+        this.statsTable.get(SINK_RECORD_READ_TOTAL_FAIL_NUMS).addValue(worker, 1, 1);
+    }
+
+    public void incSinkRecordReadFailNums(String taskId) {
+        if (StringUtils.isBlank(taskId)) {
+            return;
+        }
+        this.statsTable.get(SINK_RECORD_READ_FAIL_NUMS).addValue(taskId, 1, 1);
+    }
+
+    public void incSinkRecordReadTotalNums() {
+        this.statsTable.get(SINK_RECORD_READ_TOTAL_NUMS).addValue(worker, 1, 1);
+    }
+
+    public void incSinkRecordReadNums(String taskId) {
+        if (StringUtils.isBlank(taskId)) {
+            return;
+        }
+        this.statsTable.get(SINK_RECORD_READ_NUMS).addValue(taskId, 1, 1);
+    }
+
+    public void incSinkRecordReadNums(String taskId, int incValue) {
+        if (StringUtils.isBlank(taskId)) {
+            return;
+        }
+        this.statsTable.get(SINK_RECORD_READ_NUMS).addValue(taskId, incValue, 1);
+    }
+
+    public void incSinkRecordPutTotalNums() {
+        this.statsTable.get(SINK_RECORD_PUT_TOTAL_NUMS).addValue(worker, 1, 1);
+    }
+
+    public void incSinkRecordPutNums(String taskId) {
+        if (StringUtils.isBlank(taskId)) {
+            return;
+        }
+        this.statsTable.get(SINK_RECORD_PUT_NUMS).addValue(taskId, 1, 1);
+    }
+
+
+
+
+    public void incSinkRecordPutTotalFailRT(final long rt) {
+        this.statsTable.get(SINK_RECORD_PUT_TOTAL_FAIL_RT).addValue(worker, (int) rt, 1);
+    }
+
+    public void incSinkRecordPutFailRT(String taskId, final long rt) {
+        if (StringUtils.isBlank(taskId)) {
+            return;
+        }
+        this.statsTable.get(SINK_RECORD_PUT_FAIL_RT).addValue(taskId, (int) rt, 1);
+    }
+
+    public void incSinkRecordReadTotalFailRT(final long rt) {
+        this.statsTable.get(SINK_RECORD_READ_TOTAL_FAIL_RT).addValue(worker, (int) rt, 1);
+    }
+
+    public void incSinkRecordReadFailRT(String taskId, final long rt) {
+        if (StringUtils.isBlank(taskId)) {
+            return;
+        }
+        this.statsTable.get(SINK_RECORD_READ_FAIL_RT).addValue(taskId, (int) rt, 1);
+    }
+
+    public void incSinkRecordReadTotalRT(final long rt) {
+        this.statsTable.get(SINK_RECORD_READ_TOTAL_RT).addValue(worker, (int) rt, 1);
+    }
+
+    public void incSinkRecordReadRT(String taskId, final long rt) {
+        if (StringUtils.isBlank(taskId)) {
+            return;
+        }
+        this.statsTable.get(SINK_RECORD_READ_RT).addValue(taskId, (int) rt, 1);
+    }
+
+    public void incSinkRecordPutTotalRT(final long rt) {
+        this.statsTable.get(SINK_RECORD_PUT_TOTAL_RT).addValue(worker, (int) rt, 1);
+    }
+
+    public void incSinkRecordPutRT(String taskId, final long rt) {
+        if (StringUtils.isBlank(taskId)) {
+            return;
+        }
+        this.statsTable.get(SINK_RECORD_PUT_RT).addValue(taskId, (int) rt, 1);
+    }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsService.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsService.java
new file mode 100644
index 0000000..f92ab7a
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsService.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.rocketmq.connect.runtime.stats;
+
+import java.text.MessageFormat;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+public class ConnectStatsService extends ServiceThread {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.ROCKETMQ_CONNECT_STATS);
+
+    private static final int FREQUENCY_OF_SAMPLING = 1000;
+
+    private static final int MAX_RECORDS_OF_SAMPLING = 60 * 10;
+
+    private static int printTPSInterval = 60 * 1;
+
+    private final ConcurrentMap<String, AtomicLong> sourceTaskTimesTotal =
+        new ConcurrentHashMap<String, AtomicLong>(128);
+    private final ConcurrentMap<String, AtomicLong> sinkTaskTimesTotal =
+        new ConcurrentHashMap<String, AtomicLong>(128);
+
+    private final LinkedList<CallSnapshot> sourceTaskTimesList = new LinkedList<CallSnapshot>();
+    private final LinkedList<CallSnapshot> sinkTaskTimesList = new LinkedList<CallSnapshot>();
+
+    private long connectBootTimestamp = System.currentTimeMillis();
+    private ReentrantLock lockPut = new ReentrantLock();
+    private ReentrantLock lockGet = new ReentrantLock();
+
+    private volatile long dispatchMaxBuffer = 0;
+
+    private ReentrantLock lockSampling = new ReentrantLock();
+    private long lastPrintTimestamp = System.currentTimeMillis();
+
+    public ConnectStatsService() {
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder(1024);
+        Long totalTimes = sourceTaskTimesTotal();
+        if (0 == totalTimes) {
+            totalTimes = 1L;
+        }
+        Long sinktotalTimes = sinkTaskTimesTotal();
+        if (0 == sinktotalTimes) {
+            sinktotalTimes = 1L;
+        }
+        sb.append("\truntime: " + this.getFormatRuntime() + "\r\n");
+        sb.append("\tsourceTaskTimesTotal: " + totalTimes + "\r\n");
+        sb.append("\tsinTaskTimesTotal: " + sinktotalTimes + "\r\n");
+        sb.append("\tsourceTps: " + this.getSourceTaskTps() + "\r\n");
+        sb.append("\tgetTotalTps: " + this.getTotalTps() + "\r\n");
+        sb.append("\tsinkTps: " + this.getSinkTaskTps() + "\r\n");
+        return sb.toString();
+    }
+
+    public long sourceTaskTimesTotal() {
+        long rs = 0;
+        for (AtomicLong data : sourceTaskTimesTotal.values()) {
+            rs += data.get();
+        }
+        return rs;
+    }
+
+    public long sinkTaskTimesTotal() {
+        long rs = 0;
+        for (AtomicLong data : sinkTaskTimesTotal.values()) {
+            rs += data.get();
+        }
+        return rs;
+    }
+
+    private String getFormatRuntime() {
+        final long millisecond = 1;
+        final long second = 1000 * millisecond;
+        final long minute = 60 * second;
+        final long hour = 60 * minute;
+        final long day = 24 * hour;
+        final MessageFormat messageFormat = new MessageFormat("[ {0} days, {1} hours, {2} minutes, {3} seconds ]");
+
+        long time = System.currentTimeMillis() - this.connectBootTimestamp;
+        long days = time / day;
+        long hours = (time % day) / hour;
+        long minutes = (time % hour) / minute;
+        long seconds = (time % minute) / second;
+        return messageFormat.format(new Long[] {days, hours, minutes, seconds});
+    }
+
+
+    private String getSourceTaskTps() {
+        StringBuilder sb = new StringBuilder();
+
+        sb.append(this.getSourceTaskTps(10));
+        sb.append(" ");
+
+        sb.append(this.getSourceTaskTps(60));
+        sb.append(" ");
+
+        sb.append(this.getSourceTaskTps(600));
+
+        return sb.toString();
+    }
+
+    private String getSinkTaskTps() {
+        StringBuilder sb = new StringBuilder();
+
+        sb.append(this.getSinkTaskTps(10));
+        sb.append(" ");
+
+        sb.append(this.getSinkTaskTps(60));
+        sb.append(" ");
+
+        sb.append(this.getSinkTaskTps(600));
+
+        return sb.toString();
+    }
+
+    private String getTotalTps() {
+        StringBuilder sb = new StringBuilder();
+
+        sb.append(this.getTotalTps(10));
+        sb.append(" ");
+
+        sb.append(this.getTotalTps(60));
+        sb.append(" ");
+
+        sb.append(this.getTotalTps(600));
+
+        return sb.toString();
+    }
+
+    private String getSourceTaskTps(int time) {
+        String result = "";
+        this.lockSampling.lock();
+        try {
+            CallSnapshot last = this.sourceTaskTimesList.getLast();
+
+            if (this.sourceTaskTimesList.size() > time) {
+                CallSnapshot lastBefore = this.sourceTaskTimesList.get(this.sourceTaskTimesList.size() - (time + 1));
+                result += CallSnapshot.getTPS(lastBefore, last);
+            }
+
+        } finally {
+            this.lockSampling.unlock();
+        }
+        return result;
+    }
+    private String getSinkTaskTps(int time) {
+        String result = "";
+        this.lockSampling.lock();
+        try {
+            CallSnapshot last = this.sinkTaskTimesList.getLast();
+
+            if (this.sinkTaskTimesList.size() > time) {
+                CallSnapshot lastBefore = this.sinkTaskTimesList.get(this.sinkTaskTimesList.size() - (time + 1));
+                result += CallSnapshot.getTPS(lastBefore, last);
+            }
+
+        } finally {
+            this.lockSampling.unlock();
+        }
+        return result;
+    }
+
+    private String getTotalTps(int time) {
+        this.lockSampling.lock();
+        double source = 0;
+        double sink = 0;
+        try {
+            {
+                CallSnapshot last = this.sourceTaskTimesList.getLast();
+
+                if (this.sourceTaskTimesList.size() > time) {
+                    CallSnapshot lastBefore = this.sourceTaskTimesList.get(this.sourceTaskTimesList.size() - (time + 1));
+                    source += CallSnapshot.getTPS(lastBefore, last);
+                }
+            }
+            {
+                CallSnapshot last = this.sinkTaskTimesList.getLast();
+
+                if (this.sinkTaskTimesList.size() > time) {
+                    CallSnapshot lastBefore = this.sinkTaskTimesList.get(this.sinkTaskTimesList.size() - (time + 1));
+                    sink += CallSnapshot.getTPS(lastBefore, last);
+                }
+            }
+        } finally {
+            this.lockSampling.unlock();
+        }
+        return Double.toString(source + sink);
+    }
+
+    public HashMap<String, String> getRuntimeInfo() {
+        HashMap<String, String> result = new HashMap<String, String>(64);
+
+        Long totalTimes = sourceTaskTimesTotal();
+        if (0 == totalTimes) {
+            totalTimes = 1L;
+        }
+
+        Long sinktotalTimes = sinkTaskTimesTotal();
+        if (0 == sinktotalTimes) {
+            sinktotalTimes = 1L;
+        }
+
+        result.put("bootTimestamp", String.valueOf(this.connectBootTimestamp));
+        result.put("runtime", this.getFormatRuntime());
+        result.put("sourceTaskTimesTotal", String.valueOf(totalTimes));
+        result.put("sinkTaskTimesTotal", String.valueOf(sinktotalTimes));
+        result.put("sourceTaskTps", String.valueOf(this.getSourceTaskTps()));
+        result.put("sinkTaskTps", String.valueOf(this.getSinkTaskTps()));
+        return result;
+    }
+
+    public void run() {
+        log.info(this.getServiceName() + " service started");
+
+        while (!this.isStopped()) {
+            try {
+                this.waitForRunning(FREQUENCY_OF_SAMPLING);
+
+                this.sampling();
+
+                this.printTps();
+            } catch (Exception e) {
+                log.warn(this.getServiceName() + " service has exception. ", e);
+            }
+        }
+
+        log.info(this.getServiceName() + " service end");
+    }
+
+    @Override
+    public String getServiceName() {
+        return ConnectStatsService.class.getSimpleName();
+    }
+
+    private void sampling() {
+        this.lockSampling.lock();
+        try {
+            this.sourceTaskTimesList.add(new CallSnapshot(System.currentTimeMillis(), sourceTaskTimesTotal()));
+            if (this.sourceTaskTimesList.size() > (MAX_RECORDS_OF_SAMPLING + 1)) {
+                this.sourceTaskTimesList.removeFirst();
+            }
+
+            this.sinkTaskTimesList.add(new CallSnapshot(System.currentTimeMillis(), sinkTaskTimesTotal()));
+            if (this.sourceTaskTimesList.size() > (MAX_RECORDS_OF_SAMPLING + 1)) {
+                this.sourceTaskTimesList.removeFirst();
+            }
+        } finally {
+            this.lockSampling.unlock();
+        }
+    }
+
+    private void printTps() {
+        if (System.currentTimeMillis() > (this.lastPrintTimestamp + printTPSInterval * 1000)) {
+            this.lastPrintTimestamp = System.currentTimeMillis();
+
+            log.info("[CONNECTPS] source_task_tps {} sink_task_tps {}",
+                this.getSourceTaskTps(printTPSInterval),
+                this.getSinkTaskTps(printTPSInterval)
+            );
+        }
+    }
+
+    public AtomicLong singleSourceTaskTimesTotal(String taskId) {
+        if (StringUtils.isBlank(taskId)) {
+            return null;
+        }
+        AtomicLong rs = sourceTaskTimesTotal.get(taskId);
+        if (null == rs) {
+            rs = new AtomicLong(0);
+            AtomicLong previous = sourceTaskTimesTotal.putIfAbsent(taskId, rs);
+            if (previous != null) {
+                rs = previous;
+            }
+        }
+        return rs;
+    }
+
+    public AtomicLong singleSinkTaskTimesTotal(String taskId) {
+        if (StringUtils.isBlank(taskId)) {
+            return null;
+        }
+        AtomicLong rs = sinkTaskTimesTotal.get(taskId);
+        if (null == rs) {
+            rs = new AtomicLong(0);
+            AtomicLong previous = sinkTaskTimesTotal.putIfAbsent(taskId, rs);
+            if (previous != null) {
+                rs = previous;
+            }
+        }
+        return rs;
+    }
+
+    public Map<String, AtomicLong> getSourceTaskTimesTotal() {
+        return sourceTaskTimesTotal;
+    }
+
+    public Map<String, AtomicLong> getSinkTaskTimesTotal() {
+        return sinkTaskTimesTotal;
+    }
+
+
+    static class CallSnapshot {
+        public final long timestamp;
+        public final long callTimesTotal;
+
+        public CallSnapshot(long timestamp, long callTimesTotal) {
+            this.timestamp = timestamp;
+            this.callTimesTotal = callTimesTotal;
+        }
+
+        public static double getTPS(final CallSnapshot begin, final CallSnapshot end) {
+            long total = end.callTimesTotal - begin.callTimesTotal;
+            Long time = end.timestamp - begin.timestamp;
+
+            double tps = total / time.doubleValue();
+
+            return tps * 1000;
+        }
+    }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
index ca49600..7013110 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
@@ -26,6 +26,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.acl.common.AclClientRPCHook;
 import org.apache.rocketmq.acl.common.SessionCredentials;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
@@ -36,6 +38,7 @@ import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
 import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
 import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
 import org.apache.rocketmq.connect.runtime.service.strategy.AllocateConnAndTaskStrategy;
@@ -49,6 +52,10 @@ import static org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSinkTas
 
 public class ConnectUtil {
 
+    private final static AtomicLong GROUP_POSTFIX_ID = new AtomicLong(0);
+
+    private static final String SYS_TASK_CG_PREFIX = "connect-";
+
     public static String createGroupName(String prefix) {
         StringBuilder sb = new StringBuilder();
         sb.append(prefix).append("-");
@@ -208,4 +215,24 @@ public class ConnectUtil {
         return recordPartition;
     }
 
+
+    public static DefaultMQPullConsumer initDefaultMQPullConsumer(ConnectConfig connectConfig, String connectorName, ConnectKeyValue keyValue) {
+        RPCHook rpcHook = null;
+        if (connectConfig.getAclEnable()) {
+            rpcHook = new AclClientRPCHook(new SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey()));
+        }
+        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(rpcHook);
+        consumer.setInstanceName(createInstance(connectorName));
+        String taskGroupId = keyValue.getString("task-group-id");
+        if (StringUtils.isNotBlank(taskGroupId)) {
+            consumer.setConsumerGroup(taskGroupId);
+        } else {
+            consumer.setConsumerGroup(SYS_TASK_CG_PREFIX + connectorName);
+        }
+        if (StringUtils.isNotBlank(connectConfig.getNamesrvAddr())) {
+            consumer.setNamesrvAddr(connectConfig.getNamesrvAddr());
+        }
+        return consumer;
+    }
+
 }
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/PluginUtils.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/PluginUtils.java
index c8f81ba..b2bf34c 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/PluginUtils.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/PluginUtils.java
@@ -115,6 +115,7 @@ public class PluginUtils {
         + "|org\\.w3c\\.dom"
         + "|org\\.xml\\.sax"
         + "|io\\.openmessaging\\.connector\\.api"
+        + "|org\\.slf4j"
         + ")\\..*$"
         + "|io\\.openmessaging\\.KeyValue");
 
diff --git a/rocketmq-connect-runtime/src/main/resources/logback.xml b/rocketmq-connect-runtime/src/main/resources/logback.xml
index 6d9bc75..ace099a 100644
--- a/rocketmq-connect-runtime/src/main/resources/logback.xml
+++ b/rocketmq-connect-runtime/src/main/resources/logback.xml
@@ -78,6 +78,24 @@
         </encoder>
     </appender>
 
+    <appender name="RocketmqConnectStatsAppender"
+              class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>${user.home}/logs/rocketmqconnect/connect_stats.log</file>
+        <append>true</append>
+        <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+            <fileNamePattern>${user.home}/logs/rocketmqconnect/otherdays/connect_stats.%i.log.gz</fileNamePattern>
+            <minIndex>1</minIndex>
+            <maxIndex>20</maxIndex>
+        </rollingPolicy>
+        <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+            <maxFileSize>128MB</maxFileSize>
+        </triggeringPolicy>
+        <encoder>
+            <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>
+            <charset class="java.nio.charset.Charset">UTF-8</charset>
+        </encoder>
+    </appender>
+
     <appender name="Console" class="ch.qos.logback.core.ConsoleAppender">
         <encoder>
             <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
index b9d08aa..a9c51ed 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
@@ -38,7 +38,10 @@ import org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestConnect
 import org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestConverter;
 import org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestPositionManageServiceImpl;
 import org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestSourceTask;
+import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
 import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
 import org.apache.rocketmq.connect.runtime.utils.Plugin;
 import org.apache.rocketmq.connect.runtime.utils.TestUtils;
 import org.junit.After;
@@ -60,6 +63,9 @@ public class WorkerTest {
     private PositionManagementService offsetManagementService;
 
     @Mock
+    private ConfigManagementService configManagementService;
+
+    @Mock
     private DefaultMQProducer producer;
 
     private ConnectConfig connectConfig;
@@ -75,13 +81,19 @@ public class WorkerTest {
     @Mock
     private ConnectController connectController;
 
+    @Mock
+    private ConnectStatsManager connectStatsManager;
+
+    @Mock
+    private ConnectStatsService connectStatsService;
+
     @Before
     public void init() {
         connectConfig = new ConnectConfig();
         connectConfig.setHttpPort(8081);
         connectConfig.setStorePathRootDir(System.getProperty("user.home") + File.separator + "testConnectorStore");
         connectConfig.setNamesrvAddr("localhost:9876");
-        worker = new Worker(connectConfig, positionManagementService, offsetManagementService, plugin);
+        worker = new Worker(connectConfig, positionManagementService, configManagementService, plugin, connectController);
 
         Set<WorkerConnector> workingConnectors = new HashSet<>();
         for (int i = 0; i < 3; i++) {
@@ -105,6 +117,7 @@ public class WorkerTest {
                 new TestConverter(),
                 producer,
                 new AtomicReference(WorkerState.STARTED),
+                connectStatsManager, connectStatsService,
                 transformChain));
         }
         worker.setWorkingTasks(runnables);
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
index 02dcd04..c3f5e56 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
@@ -53,6 +53,8 @@ import org.apache.rocketmq.connect.runtime.service.ClusterManagementService;
 import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
 import org.apache.rocketmq.connect.runtime.service.DefaultConnectorContext;
 import org.apache.rocketmq.connect.runtime.service.PositionManagementServiceImpl;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
 import org.apache.rocketmq.connect.runtime.utils.Plugin;
 import org.junit.Before;
 import org.junit.Test;
@@ -132,6 +134,12 @@ public class RestHandlerTest {
 
     private AtomicReference<WorkerState> workerState;
 
+    @Mock
+    private ConnectStatsManager connectStatsManager;
+
+    @Mock
+    private ConnectStatsService connectStatsService;
+
     @Before
     public void init() throws Exception {
         workerState = new AtomicReference<>(WorkerState.STARTED);
@@ -198,8 +206,8 @@ public class RestHandlerTest {
             }
         };
         TransformChain<ConnectRecord> transformChain = new TransformChain<ConnectRecord>(new DefaultKeyValue(), new Plugin(new ArrayList<>()));
-        WorkerSourceTask workerSourceTask1 = new WorkerSourceTask("testConnectorName1", sourceTask, connectKeyValue, positionManagementServiceImpl, converter, producer, workerState, transformChain);
-        WorkerSourceTask workerSourceTask2 = new WorkerSourceTask("testConnectorName2", sourceTask, connectKeyValue1, positionManagementServiceImpl, converter, producer, workerState, transformChain);
+        WorkerSourceTask workerSourceTask1 = new WorkerSourceTask("testConnectorName1", sourceTask, connectKeyValue, positionManagementServiceImpl, converter, producer, workerState, connectStatsManager, connectStatsService, transformChain);
+        WorkerSourceTask workerSourceTask2 = new WorkerSourceTask("testConnectorName2", sourceTask, connectKeyValue1, positionManagementServiceImpl, converter, producer, workerState, connectStatsManager, connectStatsService, transformChain);
         workerTasks = new HashSet<Runnable>() {
             {
                 add(workerSourceTask1);
diff --git a/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSinkTask.java b/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSinkTask.java
index e7a8f92..08a8964 100644
--- a/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSinkTask.java
+++ b/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSinkTask.java
@@ -17,16 +17,12 @@
 
 package org.apache.rocketmq.connect.file;
 
-import com.alibaba.fastjson.JSON;
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.task.sink.SinkTask;
 import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
 import io.openmessaging.connector.api.data.ConnectRecord;
-import io.openmessaging.connector.api.data.Field;
-import io.openmessaging.connector.api.data.FieldType;
 import io.openmessaging.connector.api.data.RecordOffset;
 import io.openmessaging.connector.api.data.RecordPartition;
-import io.openmessaging.connector.api.data.Schema;
 import io.openmessaging.connector.api.errors.ConnectException;
 import java.io.IOException;
 import java.io.PrintStream;
@@ -53,19 +49,7 @@ public class FileSinkTask extends SinkTask {
         for (ConnectRecord record : sinkDataEntries) {
             Object payload = record.getData();
             log.trace("Writing line to {}: {}", logFilename(), payload);
-            Schema schema = record.getSchema();
-            if (null == schema || null == schema.getFieldType()) {
-                log.warn("error record {}", JSON.toJSONString(record));
-                continue;
-            }
-            List<Field> fields = schema.getFields();
-            for (Field field : fields) {
-                FieldType type = schema.getFieldType();
-                if (type.equals(FieldType.STRING)) {
-                    log.info("Writing line to {}: {}", logFilename(), payload);
-                    outputStream.println(payload);
-                }
-            }
+            outputStream.println(payload);
         }
 
     }
diff --git a/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSourceTask.java b/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSourceTask.java
index 74b71b7..b534fd7 100644
--- a/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSourceTask.java
+++ b/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSourceTask.java
@@ -85,7 +85,7 @@ public class FileSourceTask extends SourceTask {
                         }
                         log.debug("Skipped to offset {}", lastRecordedOffset);
                     }
-                    streamOffset = (lastRecordedOffset != null) ? (Long) lastRecordedOffset : 0L;
+                    streamOffset = (lastRecordedOffset != null) ? Long.valueOf(String.valueOf(lastRecordedOffset)) : 0L;
                 } else {
                     log.info("positionInfo is null!");
                     streamOffset = 0L;
@@ -203,6 +203,7 @@ public class FileSourceTask extends SourceTask {
 
 
     @Override public void start(SourceTaskContext sourceTaskContext) {
+        this.sourceTaskContext = sourceTaskContext;
         fileConfig = new FileConfig();
         fileConfig.load(config);
         log.info("fileName is:{}", fileConfig.getFilename());
diff --git a/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FilterTransform.java b/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FilterTransform.java
new file mode 100644
index 0000000..31f57a6
--- /dev/null
+++ b/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FilterTransform.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.file;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.ComponentContext;
+import io.openmessaging.connector.api.component.Transform;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FilterTransform implements Transform<ConnectRecord> {
+
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.FILE_CONNECTOR);
+
+    private KeyValue keyValue;
+
+    @Override public ConnectRecord doTransform(ConnectRecord record) {
+        Object data = record.getData();
+        String s = String.valueOf(data);
+        s = s + ":filter";
+        record.setData(s);
+        return record;
+    }
+
+    @Override public void validate(KeyValue config) {
+
+    }
+
+    @Override public void init(KeyValue config) {
+        this.keyValue = config;
+        log.info("transform config {}", this.keyValue);
+    }
+
+    @Override public void start(ComponentContext componentContext) {
+
+    }
+
+    @Override public void stop() {
+
+    }
+}