You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by sh...@apache.org on 2023/03/15 13:41:22 UTC

[rocketmq-eventbridge] branch runtimer updated: feat:add TargetRunnerConfigObserver (#58)

This is an automated email from the ASF dual-hosted git repository.

shenlin pushed a commit to branch runtimer
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git


The following commit(s) were added to refs/heads/runtimer by this push:
     new deae7f7  feat:add TargetRunnerConfigObserver (#58)
deae7f7 is described below

commit deae7f7b7a163962003f031796b8466a6b0b5ee2
Author: shenlin <20...@gmail.com>
AuthorDate: Wed Mar 15 21:41:15 2023 +0800

    feat:add TargetRunnerConfigObserver (#58)
    
    feat: add EventSubscriber & Observer.
---
 .licenserc.yaml                                    |   1 +
 .../RocketMQConnectTargetRunnerAPIImpl.java        |  40 +--
 .../eventbridge/adapter/runtimer/Runtimer.java     |  62 ++---
 .../adapter/runtimer/boot/EventBusListener.java    | 158 +-----------
 .../adapter/runtimer/boot/EventRuleTransfer.java   |  96 ++------
 .../adapter/runtimer/boot/EventTargetPusher.java   |  91 ++++---
 .../runtimer/boot/listener/EventSubscriber.java    |  73 ++++++
 .../runtimer/boot/listener/ListenerFactory.java    | 127 +++++++---
 .../boot/listener/RocketMQEventSubscriber.java     |  43 ++++
 .../boot/listener/TargetRunnerListener.java        |  44 ++++
 ...erTargetEntity.java => TargetRunnerConfig.java} |  43 ++--
 .../runtimer/config/RuntimerConfigDefine.java      |   2 +
 .../AbstractTargetRunnerConfigObserver.java        |  95 ++++++++
 .../service/PusherConfigManageServiceImpl.java     | 271 ---------------------
 ...ervice.java => TargetRunnerConfigObserver.java} |  69 +-----
 .../TargetRunnerConfigOnControllerObserver.java    |  48 ++++
 .../service/TargetRunnerConfigOnDBObserver.java    |  60 +++++
 .../service/TargetRunnerConfigOnFileObserver.java  |  92 +++++++
 .../src/main/resources/target-runner.config        |   7 +
 19 files changed, 704 insertions(+), 718 deletions(-)

diff --git a/.licenserc.yaml b/.licenserc.yaml
index f6d7678..f01ce66 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -23,6 +23,7 @@ header:
 
   paths-ignore:
     - 'dist'
+    - 'config'
     - 'licenses'
     - '**/*.md'
     - 'LICENSE'
diff --git a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/RocketMQConnectTargetRunnerAPIImpl.java b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/RocketMQConnectTargetRunnerAPIImpl.java
index fda982c..8971028 100644
--- a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/RocketMQConnectTargetRunnerAPIImpl.java
+++ b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/RocketMQConnectTargetRunnerAPIImpl.java
@@ -20,11 +20,8 @@ package org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect;
 import com.alibaba.fastjson.JSON;
 import com.google.common.collect.Lists;
 import com.google.gson.Gson;
-
 import java.util.ArrayList;
 import java.util.Map;
-import java.util.Objects;
-
 import lombok.SneakyThrows;
 import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.context.RocketMQConnectSourceRunnerContext;
 import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.context.RocketMQConnectTargetRunnerContext;
@@ -32,7 +29,6 @@ import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.dto.ActionStatus
 import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.dto.CreateSinkConnectorRequest;
 import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.dto.TransformRequest;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService;
 import org.apache.rocketmq.eventbridge.domain.common.enums.EventTargetStatusEnum;
 import org.apache.rocketmq.eventbridge.domain.model.Component;
 import org.apache.rocketmq.eventbridge.domain.model.run.RunOptions;
@@ -45,13 +41,10 @@ public class RocketMQConnectTargetRunnerAPIImpl extends RocketMQConverter implem
 
     private final RocketMQConnectClient rocketMQConnectClient;
 
-    private PusherConfigManageService pusherConfigManageService;
-
     public RocketMQConnectTargetRunnerAPIImpl(EventDataRepository eventDataRepository,
-        RocketMQConnectClient rocketMQConnectClient, PusherConfigManageService pusherConfigManageService) {
+        RocketMQConnectClient rocketMQConnectClient) {
         super(eventDataRepository);
         this.rocketMQConnectClient = rocketMQConnectClient;
-        this.pusherConfigManageService = pusherConfigManageService;
     }
 
     @SneakyThrows
@@ -64,14 +57,9 @@ public class RocketMQConnectTargetRunnerAPIImpl extends RocketMQConverter implem
         TransformRequest filterTransform = this.buildEventBridgeFilterTransform(filterPattern);
         TransformRequest eventBridgeTransform = this.buildEventBridgeTransform(targetTransform);
         TargetKeyValue targetKeyValue = initSinkTaskConfig(name, topicName, sinkConnectorClass,
-                sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
-        if(Objects.nonNull(pusherConfigManageService)){
-            pusherConfigManageService.putConnectTargetConfig(name, targetKeyValue);
-        }else {
-            // todo delete
-            rocketMQConnectClient.createSinkConnector(name, topicName, sinkConnectorClass,
-                    sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
-        }
+            sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
+        rocketMQConnectClient.createSinkConnector(name, topicName, sinkConnectorClass,
+            sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
         RocketMQConnectTargetRunnerContext context = new RocketMQConnectTargetRunnerContext(name, JSON.toJSONString(targetKeyValue));
         return new Gson().toJson(context);
     }
@@ -87,17 +75,11 @@ public class RocketMQConnectTargetRunnerAPIImpl extends RocketMQConverter implem
         TransformRequest eventBridgeTransform = this.buildEventBridgeTransform(targetTransform);
         //create
         TargetKeyValue targetKeyValue = initSinkTaskConfig(name, topicName, sinkConnectorClass,
-                sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
-        if(Objects.nonNull(pusherConfigManageService)){
-            pusherConfigManageService.putConnectTargetConfig(name, targetKeyValue);
-        }else {
-            // todo delete
-            //stop
-            this.delete(runContext);
-
-            rocketMQConnectClient.createSinkConnector(name, topicName, sinkConnectorClass,
-                    sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
-        }
+            sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
+        //stop
+        this.delete(runContext);
+        rocketMQConnectClient.createSinkConnector(name, topicName, sinkConnectorClass,
+            sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
         RocketMQConnectTargetRunnerContext context = new RocketMQConnectTargetRunnerContext(name, JSON.toJSONString(targetKeyValue));
         return new Gson().toJson(context);
     }
@@ -133,6 +115,7 @@ public class RocketMQConnectTargetRunnerAPIImpl extends RocketMQConverter implem
 
     /**
      * init sink task config
+     *
      * @param name
      * @param topicName
      * @param sinkClass
@@ -140,7 +123,8 @@ public class RocketMQConnectTargetRunnerAPIImpl extends RocketMQConverter implem
      * @param transforms
      * @return
      */
-    private TargetKeyValue initSinkTaskConfig(String name, String topicName, String sinkClass, Map<String, Object> sinkConfig, ArrayList<TransformRequest> transforms) {
+    private TargetKeyValue initSinkTaskConfig(String name, String topicName, String sinkClass,
+        Map<String, Object> sinkConfig, ArrayList<TransformRequest> transforms) {
         CreateSinkConnectorRequest request = new CreateSinkConnectorRequest();
         request.setName(name);
         request.setTopicName(topicName);
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
index 896885d..682b830 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
@@ -17,35 +17,33 @@
 
 package org.apache.rocketmq.eventbridge.adapter.runtimer;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.PostConstruct;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.EventBusListener;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.EventRuleTransfer;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.EventTargetPusher;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.RocketMQEventSubscriber;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.RuntimerState;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigObserver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
-import javax.annotation.PostConstruct;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
 /**
  * event bridge runtimer
  *
  * @author artisan
  */
 @Component
-public class Runtimer extends ServiceThread{
+public class Runtimer {
 
     private static final Logger logger = LoggerFactory.getLogger(Runtimer.class);
 
@@ -55,7 +53,7 @@ public class Runtimer extends ServiceThread{
 
     private ListenerFactory listenerFactory;
 
-    private PusherConfigManageService pusherConfigManageService;
+    private TargetRunnerConfigObserver targetRunnerConfigObserver;
 
     private Map<String, List<TargetKeyValue>> taskConfigs = new HashMap<>();
 
@@ -67,51 +65,23 @@ public class Runtimer extends ServiceThread{
 
     private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor((Runnable r) -> new Thread(r, "RuntimerScheduledThread"));
 
-    public Runtimer(Plugin plugin, ListenerFactory listenerFactory, PusherConfigManageService configManageService) {
+    public Runtimer(Plugin plugin, ListenerFactory listenerFactory, TargetRunnerConfigObserver configManageService) {
         this.plugin = plugin;
         this.listenerFactory = listenerFactory;
-        this.pusherConfigManageService = configManageService;
+        this.targetRunnerConfigObserver = configManageService;
     }
 
     @PostConstruct
     public void initAndStart() {
         logger.info("init runtimer task config");
-        this.taskConfigs = pusherConfigManageService.getTaskConfigs();
-        listener = new EventBusListener(listenerFactory, pusherConfigManageService);
-        listener.initOrUpdateListenConsumer(taskConfigs);
-        transfer = new EventRuleTransfer(plugin, listenerFactory, pusherConfigManageService);
-        transfer.initOrUpdateTaskTransform(taskConfigs);
-        pusher = new EventTargetPusher(plugin, listenerFactory, pusherConfigManageService);
-        pusher.initOrUpdatePusherTask(taskConfigs);
+        new EventBusListener(listenerFactory, new RocketMQEventSubscriber(listenerFactory)).start();
+        new EventRuleTransfer(listenerFactory).start();
+        new EventTargetPusher(listenerFactory).start();
         startRuntimer();
     }
 
     public void startRuntimer() {
         runtimerState = new AtomicReference<>(RuntimerState.START);
-        this.start();
     }
 
-    @Override
-    public String getServiceName() {
-        return Runtimer.class.getSimpleName();
-    }
-
-    @Override
-    public void run() {
-
-        listener.start();
-
-        transfer.start();
-
-        pusher.start();
-
-        scheduledExecutorService.scheduleAtFixedRate(() -> {
-            try {
-                this.pusherConfigManageService.persist();
-            } catch (Exception e) {
-                logger.error("schedule persist config error.", e);
-            }
-        }, 500, 500, TimeUnit.MILLISECONDS);
-
-    }
 }
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
index 0c40235..bc44668 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
@@ -17,172 +17,38 @@
 
 package org.apache.rocketmq.eventbridge.adapter.runtimer.boot;
 
-import com.alibaba.fastjson.JSON;
-import com.google.common.collect.Lists;
-import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.data.ConnectRecord;
-import io.openmessaging.connector.api.data.RecordOffset;
-import io.openmessaging.connector.api.data.RecordPartition;
-import io.openmessaging.connector.api.data.Schema;
-import io.openmessaging.internal.DefaultKeyValue;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
+import java.util.List;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.EventSubscriber;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.QueueState;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.util.CollectionUtils;
-
-import java.nio.charset.StandardCharsets;
-import java.util.*;
-import java.util.concurrent.*;
 
 /**
  * listen the event and offer to queue
+ *
  * @author artisan
  */
 public class EventBusListener extends ServiceThread {
 
-    private Logger logger = LoggerFactory.getLogger(EventBusListener.class);
-
-    private final ConcurrentHashMap<MessageQueue, Long> messageQueuesOffsetMap;
-
-    private final ConcurrentHashMap<MessageQueue, QueueState> messageQueuesStateMap;
-
-    private List<String> topics = new CopyOnWriteArrayList<>();
-
-    private List<DefaultLitePullConsumer> listenConsumer = new CopyOnWriteArrayList<>();
-
     private ListenerFactory listenerFactory;
 
-    private PusherConfigManageService pusherConfigManageService;
-
-    private ExecutorService executorService = new ThreadPoolExecutor(20,60, 1000,TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(100));
-
-    private BlockingQueue<MessageExt> eventMessage = new LinkedBlockingQueue(50000);
+    private EventSubscriber eventSubscriber;
 
-    public EventBusListener(ListenerFactory listenerFactory, PusherConfigManageService pusherConfigManageService){
-        this.messageQueuesOffsetMap = new ConcurrentHashMap<>(256);
-        this.messageQueuesStateMap = new ConcurrentHashMap<>(256);
+    public EventBusListener(ListenerFactory listenerFactory,
+        EventSubscriber eventSubscriber) {
         this.listenerFactory = listenerFactory;
-        this.pusherConfigManageService = pusherConfigManageService;
-        this.pusherConfigManageService.registerListener(new ConsumerUpdateListenerImpl());
-    }
-
-    /**
-     * init listen consumer
-     * @param taskConfig
-     */
-    public void initOrUpdateListenConsumer(Map<String, List<TargetKeyValue>> taskConfig){
-        if(MapUtils.isEmpty(taskConfig)){
-            logger.warn("initListenConsumer by taskConfig param is empty");
-            return;
-        }
-        List<TargetKeyValue> targetKeyValues = initTaskKeyInfo(taskConfig);
-        this.topics.addAll(listenerFactory.parseTopicListByList(targetKeyValues));
-        for (String topic : topics){
-            DefaultLitePullConsumer pullConsumer = listenerFactory.initDefaultMQPullConsumer(topic);
-            listenConsumer.add(pullConsumer);
-        }
-        logger.info("init or update consumer succeed , consumer is - {}", JSON.toJSONString(listenConsumer));
-    }
-
-    /**
-     * init all task config info
-     * @param taskConfig
-     * @return
-     */
-    private List<TargetKeyValue> initTaskKeyInfo(Map<String, List<TargetKeyValue>> taskConfig) {
-        Set<TargetKeyValue> targetKeyValues = new HashSet<>();
-        for(String connectName : taskConfig.keySet()){
-            targetKeyValues.addAll(taskConfig.get(connectName));
-        }
-        return Lists.newArrayList(targetKeyValues);
+        this.eventSubscriber = eventSubscriber;
     }
 
     @Override
     public void run() {
-        while (!stopped){
-            if(CollectionUtils.isEmpty(listenConsumer)){
-                logger.info("current listen consumer is empty");
-                this.waitForRunning(1000);
-                continue;
-            }
-            for(DefaultLitePullConsumer pullConsumer : listenConsumer) {
-                executorService.submit(() -> {
-                    try {
-                        List<MessageExt> messageExts = pullConsumer.poll(3000);
-                        if (CollectionUtils.isEmpty(messageExts)) {
-                            logger.info("consumer poll message empty , consumer - {}", JSON.toJSONString(pullConsumer));
-                            return;
-                        }
-                        for (MessageExt messageExt : messageExts) {
-                            ConnectRecord eventRecord = convertToSinkRecord(messageExt);
-                            listenerFactory.offerEventRecord(eventRecord);
-                            logger.debug("consumer - {} - offer listen event record -  {} - by - message event- {}", JSON.toJSONString(pullConsumer), eventRecord, messageExt);
-                        }
-                    } finally {
-                        pullConsumer.commitSync();
-                    }
-                });
-            }
-        }
-    }
-
-    @Override
-    public String getServiceName() {
-        return this.getClass().getSimpleName();
-    }
-
-    /**
-     * MessageExt convert to connect record
-     * @param messageExt
-     * @return
-     */
-    private ConnectRecord convertToSinkRecord(MessageExt messageExt) {
-        Map<String, String> properties = messageExt.getProperties();
-        Schema schema;
-        Long timestamp;
-        ConnectRecord sinkRecord;
-        String connectTimestamp = properties.get(RuntimerConfigDefine.CONNECT_TIMESTAMP);
-        timestamp = StringUtils.isNotEmpty(connectTimestamp) ? Long.valueOf(connectTimestamp) : null;
-        String connectSchema = properties.get(RuntimerConfigDefine.CONNECT_SCHEMA);
-        schema = StringUtils.isNotEmpty(connectSchema) ? JSON.parseObject(connectSchema, Schema.class) : null;
-        byte[] body = messageExt.getBody();
-        RecordPartition recordPartition = listenerFactory.convertToRecordPartition(messageExt.getTopic(), messageExt.getBrokerName(), messageExt.getQueueId());
-        RecordOffset recordOffset = listenerFactory.convertToRecordOffset(messageExt.getQueueOffset());
-        String bodyStr = new String(body, StandardCharsets.UTF_8);
-        sinkRecord = new ConnectRecord(recordPartition, recordOffset, timestamp, schema, bodyStr);
-        KeyValue keyValue = new DefaultKeyValue();
-        keyValue.put(RuntimerConfigDefine.CONNECT_TOPICNAME, messageExt.getTopic());
-        if (MapUtils.isNotEmpty(properties)) {
-            for (Map.Entry<String, String> entry : properties.entrySet()) {
-                keyValue.put(entry.getKey(), entry.getValue());
-            }
+        while (!stopped) {
+            List<ConnectRecord> recordList = eventSubscriber.pull();
+            listenerFactory.offerEventRecords(recordList);
         }
-        sinkRecord.addExtension(keyValue);
-        return sinkRecord;
     }
 
-    /**
-     * consumer update listener
-     */
-    class ConsumerUpdateListenerImpl implements PusherConfigManageService.TargetConfigUpdateListener {
-
-        @Override
-        public void onConfigUpdate(PusherTargetEntity targetEntity) {
-            logger.info("consumer update by new target config changed, target info -{}", JSON.toJSONString(targetEntity));
-            Map<String, List<TargetKeyValue>> lastTargetMap = new HashMap<>();
-            lastTargetMap.put(targetEntity.getConnectName(), targetEntity.getTargetKeyValues());
-            initOrUpdateListenConsumer(lastTargetMap);
-        }
+    @Override public String getServiceName() {
+        return EventBusListener.class.getSimpleName();
     }
 }
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
index f14fdec..eaee64c 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
@@ -19,19 +19,23 @@ package org.apache.rocketmq.eventbridge.adapter.runtimer.boot;
 
 import com.alibaba.fastjson.JSON;
 import io.openmessaging.connector.api.data.ConnectRecord;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.transfer.TransformEngine;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
-import java.util.concurrent.*;
-
 /**
  * receive event and transfer the rule to pusher
  */
@@ -41,23 +45,10 @@ public class EventRuleTransfer extends ServiceThread {
 
     private ListenerFactory listenerFactory;
 
-    private PusherConfigManageService pusherConfigManageService;
-
-    private Plugin plugin;
+    private ExecutorService executorService = new ThreadPoolExecutor(20, 60, 1000, TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(100));
 
-    Map<TargetKeyValue/*taskConfig*/, TransformEngine<ConnectRecord>/*taskTransform*/> taskTransformMap = new ConcurrentHashMap<>(20);
-
-    private ExecutorService executorService = new ThreadPoolExecutor(20,60, 1000,TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(100));
-
-    public EventRuleTransfer(Plugin plugin, ListenerFactory listenerFactory, PusherConfigManageService pusherConfigManageService){
-        this.plugin = plugin;
+    public EventRuleTransfer(ListenerFactory listenerFactory) {
         this.listenerFactory = listenerFactory;
-        this.pusherConfigManageService = pusherConfigManageService;
-        this.pusherConfigManageService.registerListener(new TransformUpdateListenerImpl());
-    }
-
-    public void initOrUpdateTaskTransform(Map<String, List<TargetKeyValue>> taskConfig){
-        this.taskTransformMap.putAll(initSinkTaskTransformInfo(taskConfig));
     }
 
     @Override
@@ -67,9 +58,10 @@ public class EventRuleTransfer extends ServiceThread {
 
     @Override
     public void run() {
-        while (!stopped){
+        while (!stopped) {
+            //TODO JAVA8 并发流处理
             ConnectRecord eventRecord = listenerFactory.takeEventRecord();
-            if(Objects.isNull(eventRecord)){
+            if (Objects.isNull(eventRecord)) {
                 logger.info("listen eventRecord is empty, continue by curTime - {}", System.currentTimeMillis());
                 this.waitForRunning(1000);
                 continue;
@@ -77,62 +69,22 @@ public class EventRuleTransfer extends ServiceThread {
             executorService.submit(() -> {
                 // extension add sub
                 // rule - target
-                for (TargetKeyValue targetKeyValue : taskTransformMap.keySet()){
-                    // add threadPool for cup task
-                    // attention coreSize
-                    TransformEngine<ConnectRecord> transformEngine = taskTransformMap.get(targetKeyValue);
+                listenerFactory.getTaskTransformMap().entrySet().forEach(entry -> {
+                    TransformEngine<ConnectRecord> transformEngine = entry.getValue();
                     ConnectRecord transformRecord = transformEngine.doTransforms(eventRecord);
-                    if(Objects.isNull(transformRecord)){
-                        continue;
+                    if (Objects.isNull(transformRecord)) {
+                        return;
                     }
                     // a bean for maintain
-                    Map<TargetKeyValue,ConnectRecord> targetMap = new HashMap<>();
-                    targetMap.put(targetKeyValue, transformRecord);
-                    listenerFactory.offerTargetTaskQueue(targetMap);
+                    listenerFactory.offerTargetTaskQueue(transformRecord);
+                    logger.debug("offer target task queue succeed, targetMap - {}", JSON.toJSONString(transformRecord));
+                });
+
 
-                    logger.debug("offer target task queue succeed, targetMap - {}", JSON.toJSONString(targetMap));
-                    // metrics
-                    // logger
-                    // key->connectKeyValue to simple name
-                    // connectRecord add system properties for taskClass info
-                }
             });
         }
     }
 
-    /**
-     * Init sink task transform map
-     * key: task config
-     * value: transformEngine
-     * @param taskConfig
-     * @return
-     */
-    private Map<TargetKeyValue, TransformEngine<ConnectRecord>> initSinkTaskTransformInfo(Map<String, List<TargetKeyValue>> taskConfig) {
-        Map<TargetKeyValue, TransformEngine<ConnectRecord>> curTaskTransformMap = new HashMap<>();
-        Set<TargetKeyValue> allTaskKeySet = new HashSet<>();
-        for(String connectName : taskConfig.keySet()){
-            List<TargetKeyValue> taskKeyList = taskConfig.get(connectName);
-            allTaskKeySet.addAll(new HashSet<>(taskKeyList));
-        }
-        for(TargetKeyValue keyValue : allTaskKeySet){
-            TransformEngine<ConnectRecord> transformChain = new TransformEngine<>(keyValue, plugin);
-            curTaskTransformMap.put(keyValue, transformChain);
-        }
-        logger.info("init sink task transform info succeed, transform map - {}", JSON.toJSONString(curTaskTransformMap));
-        return curTaskTransformMap;
-    }
 
-    /**
-     * transform update listener
-     */
-    class TransformUpdateListenerImpl implements PusherConfigManageService.TargetConfigUpdateListener {
 
-        @Override
-        public void onConfigUpdate(PusherTargetEntity targetEntity) {
-            logger.info("transform update by new target config changed, target info -{}", JSON.toJSONString(targetEntity));
-            Map<String, List<TargetKeyValue>> lastTargetMap = new HashMap<>();
-            lastTargetMap.put(targetEntity.getConnectName(), targetEntity.getTargetKeyValues());
-            initOrUpdateTaskTransform(lastTargetMap);
-        }
-    }
 }
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
index d324fe4..35c6bb2 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
@@ -22,27 +22,32 @@ import com.google.common.collect.Lists;
 import io.netty.util.internal.ConcurrentSet;
 import io.openmessaging.connector.api.component.task.sink.SinkTask;
 import io.openmessaging.connector.api.data.ConnectRecord;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
 import org.apache.commons.collections.MapUtils;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerListener;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.pusher.PusherTaskContext;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.PluginClassLoader;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigObserver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
-import java.util.concurrent.CopyOnWriteArrayList;
-
 /**
  * event target push to sink task
+ *
  * @author artisan
  */
-public class EventTargetPusher extends ServiceThread {
+public class EventTargetPusher extends ServiceThread{
 
     private static final Logger logger = LoggerFactory.getLogger(EventTargetPusher.class);
 
@@ -56,29 +61,27 @@ public class EventTargetPusher extends ServiceThread {
 
     private ListenerFactory listenerFactory;
 
-    private PusherConfigManageService pusherConfigManageService;
+    private TargetRunnerConfigObserver targetRunnerConfigObserver;
 
     private List<SinkTask> pusherTasks = new CopyOnWriteArrayList<>();
 
-    public EventTargetPusher(Plugin plugin, ListenerFactory listenerFactory, PusherConfigManageService pusherConfigManageService){
-        this.plugin = plugin;
+    public EventTargetPusher(ListenerFactory listenerFactory) {
         this.listenerFactory = listenerFactory;
-        this.pusherConfigManageService = pusherConfigManageService;
-        this.pusherConfigManageService.registerListener(new TargetUpdateListenerImpl());
     }
 
     /**
      * init running tasks
+     *
      * @param taskConfig
      */
-    public void initOrUpdatePusherTask(Map<String, List<TargetKeyValue>> taskConfig){
+    public void initOrUpdatePusherTask(Map<String, List<TargetKeyValue>> taskConfig) {
         Set<TargetKeyValue> taskProperty = new HashSet<>();
-        for(String connectName : taskConfig.keySet()){
+        for (String connectName : taskConfig.keySet()) {
             List<TargetKeyValue> targetKeyValues = taskConfig.get(connectName);
             taskProperty.addAll(new HashSet<>(targetKeyValues));
         }
-        for(TargetKeyValue targetKeyValue : taskProperty){
-            try{
+        for (TargetKeyValue targetKeyValue : taskProperty) {
+            try {
                 String taskClass = targetKeyValue.getString(RuntimerConfigDefine.TASK_CLASS);
                 ClassLoader loader = plugin.getPluginClassLoader(taskClass);
                 Class taskClazz;
@@ -98,7 +101,7 @@ public class EventTargetPusher extends ServiceThread {
                     Plugin.compareAndSwapLoaders(loader);
                 }
                 logger.info("init target task succeed, target key - {}", JSON.toJSONString(targetKeyValue));
-            }catch (Exception exception){
+            } catch (Exception exception) {
                 exception.printStackTrace();
             }
         }
@@ -106,26 +109,26 @@ public class EventTargetPusher extends ServiceThread {
 
     @Override
     public void run() {
-        while (!stopped){
-            Map<TargetKeyValue, ConnectRecord> taskPusher = listenerFactory.takeTargetMap();
-            if(MapUtils.isEmpty(taskPusher)){
-                logger.info("current target pusher is empty");
-                this.waitForRunning(1000);
-                continue;
-            }
-            logger.info("start push content by pusher - {}", JSON.toJSONString(taskPusher));
-
-            TargetKeyValue targetKeyValue = taskPusher.keySet().iterator().next();
-            // task-id for unique-key at ConnectKeyValue
-            // ConnectKeyValue -> new class for name
-            // also add in ConnectRecord class system property
-            String taskPushName = targetKeyValue.getString(RuntimerConfigDefine.TASK_CLASS);
-            // add thread pool
-            for(SinkTask sinkTask : pusherTasks){
-                if(sinkTask.getClass().getName().equals(taskPushName)){
-                    sinkTask.put(Lists.newArrayList(taskPusher.get(targetKeyValue)));
-                }
-            }
+        while (!stopped) {
+//            Map<TargetKeyValue, ConnectRecord> taskPusher = listenerFactory.takeTargetMap();
+//            if (MapUtils.isEmpty(taskPusher)) {
+//                logger.info("current target pusher is empty");
+//                this.waitForRunning(1000);
+//                continue;
+//            }
+//            logger.info("start push content by pusher - {}", JSON.toJSONString(taskPusher));
+//
+//            TargetKeyValue targetKeyValue = taskPusher.keySet().iterator().next();
+//            // task-id for unique-key at ConnectKeyValue
+//            // ConnectKeyValue -> new class for name
+//            // also add in ConnectRecord class system property
+//            String taskPushName = targetKeyValue.getString(RuntimerConfigDefine.TASK_CLASS);
+//            // add thread pool
+//            for (SinkTask sinkTask : pusherTasks) {
+//                if (sinkTask.getClass().getName().equals(taskPushName)) {
+//                    sinkTask.put(Lists.newArrayList(taskPusher.get(targetKeyValue)));
+//                }
+//            }
         }
     }
 
@@ -134,17 +137,5 @@ public class EventTargetPusher extends ServiceThread {
         return EventTargetPusher.class.getSimpleName();
     }
 
-    /**
-     * target update listener
-     */
-    class TargetUpdateListenerImpl implements PusherConfigManageService.TargetConfigUpdateListener {
-
-        @Override
-        public void onConfigUpdate(PusherTargetEntity targetEntity) {
-            logger.info("transform update by new target config changed, target info -{}", JSON.toJSONString(targetEntity));
-            Map<String, List<TargetKeyValue>> lastTargetMap = new HashMap<>();
-            lastTargetMap.put(targetEntity.getConnectName(), targetEntity.getTargetKeyValues());
-            initOrUpdatePusherTask(lastTargetMap);
-        }
-    }
+
 }
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/EventSubscriber.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/EventSubscriber.java
new file mode 100644
index 0000000..23c499b
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/EventSubscriber.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener;
+
+import io.openmessaging.connector.api.data.ConnectRecord;
+import java.util.List;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+
+public abstract class EventSubscriber implements TargetRunnerListener {
+
+    abstract void refresh();
+
+    /**
+     * Pull connect records from store, Blocking method when is empty.
+     *
+     * @return
+     */
+    public abstract List<ConnectRecord> pull();
+
+    /**
+     * Commit the connect records.
+     *
+     * @param connectRecordList
+     */
+    public abstract void commit(List<ConnectRecord> connectRecordList);
+
+    /**
+     * Call when add new target runner to runtimer.
+     *
+     * @param targetRunnerConfig
+     */
+    @Override
+    public void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+        this.refresh();
+    }
+
+    /**
+     * Call when the old target runner updated.
+     *
+     * @param targetRunnerConfig
+     */
+    @Override
+    public void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+
+        this.refresh();
+    }
+
+    /**
+     * Call when the old target runner deleted from runtimer.
+     *
+     * @param targetRunnerConfig
+     */
+    @Override
+    public void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+        this.refresh();
+    }
+
+}
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java
index 07f010e..2d22094 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java
@@ -17,32 +17,42 @@
 
 package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener;
 
+import com.alibaba.fastjson.JSON;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.data.RecordOffset;
 import io.openmessaging.connector.api.data.RecordPartition;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.transfer.TransformEngine;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.LoggerName;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
-import java.util.*;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
 @Component
-public class ListenerFactory {
+public class ListenerFactory implements TargetRunnerListener {
 
     private final static Logger logger = LoggerFactory.getLogger(LoggerName.EventBus_Listener);
 
@@ -52,13 +62,19 @@ public class ListenerFactory {
 
     public static final String QUEUE_OFFSET = "queueOffset";
 
-    private BlockingQueue<PusherTargetEntity> pusherTargetQueue = new LinkedBlockingQueue<>(1000);
+    private Plugin plugin;
+
+    private BlockingQueue<TargetRunnerConfig> pusherTargetQueue = new LinkedBlockingQueue<>(1000);
 
     private BlockingQueue<MessageExt> eventMessage = new LinkedBlockingQueue(50000);
 
     private BlockingQueue<ConnectRecord> eventRecord = new LinkedBlockingQueue<>(50000);
 
-    private BlockingQueue<Map<TargetKeyValue, ConnectRecord>> targetQueue = new LinkedBlockingQueue<>(50000);
+    private BlockingQueue<ConnectRecord> targetQueue = new LinkedBlockingQueue<>(50000);
+
+    private Map<String/*TargetRunnerName*/, TransformEngine<ConnectRecord>> taskTransformMap = new ConcurrentHashMap<>(20);
+    private Map<String/*TargetRunnerName*/, SinkTask> pusherTaskMap = new ConcurrentHashMap<>(20);
+    private Map<String/*TargetRunnerName*/, TargetRunnerConfig> targetRunnerConfigMap = new ConcurrentHashMap<>(20);
 
     @Value("${rocketmq.namesrvAddr:}")
     private String namesrvAddr;
@@ -70,14 +86,14 @@ public class ListenerFactory {
             consumer.setNamesrvAddr(namesrvAddr);
             consumer.subscribe(topic, "*");
             consumer.start();
-        }catch (Exception exception){
+        } catch (Exception exception) {
             logger.error("init default pull consumer exception, topic -" + topic + "-stackTrace-", exception);
         }
         return consumer;
     }
 
-    public PusherTargetEntity takeTaskConfig(){
-        if(pusherTargetQueue.isEmpty()){
+    public TargetRunnerConfig takeTaskConfig() {
+        if (pusherTargetQueue.isEmpty()) {
             return null;
         }
         try {
@@ -90,45 +106,47 @@ public class ListenerFactory {
 
     /**
      * Offer listener event
+     *
      * @param messageExt
      * @return
      */
-    public boolean offerListenEvent(MessageExt messageExt){
+    public boolean offerListenEvent(MessageExt messageExt) {
         return eventMessage.offer(messageExt);
     }
 
     public MessageExt takeListenerEvent() {
-        if(eventMessage.isEmpty()){
+        if (eventMessage.isEmpty()) {
             return null;
         }
         try {
             return eventMessage.take();
-        }catch (Exception exception){
+        } catch (Exception exception) {
             exception.printStackTrace();
         }
         return null;
     }
 
     /**
-     * offer event record
-     * @param connectRecord
-     * @return
+     * offer event records
+     *
+     * @param connectRecords
      */
-    public boolean offerEventRecord(ConnectRecord connectRecord){
-        return eventRecord.offer(connectRecord);
+   public boolean  offerEventRecords(List<ConnectRecord> connectRecords) {
+        return eventRecord.addAll(connectRecords);
     }
 
     /**
      * take event record
+     *
      * @return
      */
     public ConnectRecord takeEventRecord() {
-        if(eventRecord.isEmpty()){
+        if (eventRecord.isEmpty()) {
             return null;
         }
         try {
             return eventRecord.take();
-        }catch (Exception exception){
+        } catch (Exception exception) {
             logger.error("take event record exception - stack-> ", exception);
         }
         return null;
@@ -148,6 +166,7 @@ public class ListenerFactory {
 
     /**
      * parse topic list by task config
+     *
      * @param taskConfig
      * @return
      */
@@ -162,12 +181,13 @@ public class ListenerFactory {
 
     /**
      * parse topic list by task config list
+     *
      * @param taskConfigs
      * @return
      */
     public List<String> parseTopicListByList(List<TargetKeyValue> taskConfigs) {
         Set<String> allTopicList = Sets.newHashSet();
-        for(TargetKeyValue taskConfig : taskConfigs){
+        for (TargetKeyValue taskConfig : taskConfigs) {
             String messageQueueStr = taskConfig.getString(RuntimerConfigDefine.CONNECT_TOPICNAME);
             if (StringUtils.isBlank(messageQueueStr)) {
                 continue;
@@ -180,6 +200,7 @@ public class ListenerFactory {
 
     /**
      * parse msg queue by queue json
+     *
      * @param messageQueueStr
      * @return
      */
@@ -207,19 +228,67 @@ public class ListenerFactory {
         return recordOffset;
     }
 
-    public boolean offerTargetTaskQueue(Map<TargetKeyValue, ConnectRecord> targetMap){
-        return targetQueue.offer(targetMap);
+    public boolean offerTargetTaskQueue(ConnectRecord connectRecord) {
+        return targetQueue.offer(connectRecord);
     }
 
-    public Map<TargetKeyValue, ConnectRecord> takeTargetMap(){
-        if(targetQueue.isEmpty()){
+    public ConnectRecord takeTargetMap() {
+        if (targetQueue.isEmpty()) {
             return null;
         }
-        try{
+        try {
             return targetQueue.take();
-        }catch (Exception exception){
+        } catch (Exception exception) {
             exception.printStackTrace();
         }
         return null;
     }
+
+    /**
+     * Init sink task transform map key: task config value: transformEngine
+     *
+     * @param taskConfig
+     * @return
+     */
+    private Map<TargetKeyValue, TransformEngine<ConnectRecord>> initSinkTaskTransformInfo(
+        Map<String, List<TargetKeyValue>> taskConfig) {
+        Map<TargetKeyValue, TransformEngine<ConnectRecord>> curTaskTransformMap = new HashMap<>();
+        Set<TargetKeyValue> allTaskKeySet = new HashSet<>();
+        for (String connectName : taskConfig.keySet()) {
+            List<TargetKeyValue> taskKeyList = taskConfig.get(connectName);
+            allTaskKeySet.addAll(new HashSet<>(taskKeyList));
+        }
+        for (TargetKeyValue keyValue : allTaskKeySet) {
+            TransformEngine<ConnectRecord> transformChain = new TransformEngine<>(keyValue, plugin);
+            curTaskTransformMap.put(keyValue, transformChain);
+        }
+        logger.info("init sink task transform info succeed, transform map - {}", JSON.toJSONString(curTaskTransformMap));
+        return curTaskTransformMap;
+    }
+
+
+    @Override public void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+        // refresh taskTransformMap & pusherTaskMap & targetRunnerConfigMap
+    }
+
+    @Override public void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+
+    }
+
+    @Override public void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+
+    }
+
+    public Map<String, TransformEngine<ConnectRecord>> getTaskTransformMap() {
+        return taskTransformMap;
+    }
+
+    public Map<String, SinkTask> getPusherTaskMap() {
+        return pusherTaskMap;
+    }
+
+    public Map<String, TargetRunnerConfig> getTargetRunnerConfigMap() {
+        return targetRunnerConfigMap;
+    }
+
 }
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
new file mode 100644
index 0000000..7c8e5a3
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener;
+
+import io.openmessaging.connector.api.data.ConnectRecord;
+import java.util.List;
+
+public class RocketMQEventSubscriber extends EventSubscriber {
+
+    private ListenerFactory listenerFactory;
+
+    public RocketMQEventSubscriber(
+        ListenerFactory listenerFactory) {
+        this.listenerFactory = listenerFactory;
+    }
+
+    @Override void refresh() {
+
+    }
+
+    @Override public List<ConnectRecord> pull() {
+        return null;
+    }
+
+    @Override
+    public void commit(List<ConnectRecord> connectRecordList) {
+
+    }
+}
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/TargetRunnerListener.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/TargetRunnerListener.java
new file mode 100644
index 0000000..f528b7b
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/TargetRunnerListener.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener;
+
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+
+public interface TargetRunnerListener {
+
+    /**
+     * Call when add new target runner to runtimer.
+     *
+     * @param targetRunnerConfig
+     */
+    void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig);
+
+    /**
+     * Call when the old target runner updated.
+     *
+     * @param targetRunnerConfig
+     */
+    void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig);
+
+    /**
+     * Call when the old target runner deleted from runtimer.
+     *
+     * @param targetRunnerConfig
+     */
+    void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig);
+
+}
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/PusherTargetEntity.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
similarity index 54%
rename from adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/PusherTargetEntity.java
rename to adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
index dc7ced4..45bc5f1 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/PusherTargetEntity.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
@@ -17,34 +17,45 @@
 
 package org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity;
 
-import lombok.Data;
-
 import java.io.Serializable;
-import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import lombok.Data;
 
 /**
  * pusher target key config
  */
 @Data
-public class PusherTargetEntity implements Serializable {
+public class TargetRunnerConfig implements Serializable {
 
-    private String connectName;
+    private String name;
 
-    private List<TargetKeyValue> targetKeyValues;
+    /**
+     * All data are reserved in this map.
+     */
+    private Map<String, String> properties;
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        TargetRunnerConfig config = (TargetRunnerConfig) o;
+        return Objects.equals(name, config.name) && Objects.equals(properties, config.properties);
+    }
 
     @Override
-    public boolean equals(Object object){
-        if (object != null && object.getClass() == this.getClass()) {
-            PusherTargetEntity targetEntity = (PusherTargetEntity) object;
-            return this.connectName.equals(targetEntity.getConnectName())
-                    && this.targetKeyValues.size() == targetEntity.getTargetKeyValues().size()
-                    && this.targetKeyValues.containsAll(targetEntity.getTargetKeyValues());
-        }
-        return false;
+    public int hashCode() {
+        return Objects.hash(name, properties);
     }
 
     @Override
-    public int hashCode(){
-        return this.connectName.hashCode() + this.targetKeyValues.hashCode();
+    public String toString() {
+        //TODO
+        return "TargetRunnerConfig{" +
+            "connectName='" + name + '\'' +
+            ", properties=" + properties +
+            '}';
     }
 }
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java
index ebf1c88..500f300 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java
@@ -80,6 +80,8 @@ public class RuntimerConfigDefine {
 
     public static final String CONNECT_TOPICNAME = "connect-topicname";
 
+    public static final String CONNECT_RULE_NAME = "connect-rule-name";
+
     public static final String CONNECT_TOPICNAMES = "connect-topicnames";
 
     public static final String CONNECT_SOURCE_PARTITION = "connect-source-partition";
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/AbstractTargetRunnerConfigObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/AbstractTargetRunnerConfigObserver.java
new file mode 100644
index 0000000..619bc63
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/AbstractTargetRunnerConfigObserver.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
+
+import com.google.common.collect.Sets;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerListener;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
+
+@Service
+public abstract class AbstractTargetRunnerConfigObserver implements TargetRunnerConfigObserver {
+
+    private Set<TargetRunnerConfig> targetRunnerConfigs = Sets.newHashSet();
+
+    /**
+     * All listeners to trigger while config change.
+     */
+    private Set<TargetRunnerListener> targetRunnerConfigListeners = new HashSet<>();
+
+    public Set<TargetRunnerConfig> getTargetRunnerConfig() {
+        return targetRunnerConfigs;
+    }
+
+    public abstract Set<TargetRunnerConfig> getLatestTargetRunnerConfig();
+
+    @Override
+    public void registerListener(TargetRunnerListener listener) {
+        this.targetRunnerConfigListeners.add(listener);
+    }
+
+    void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+        this.targetRunnerConfigs.add(targetRunnerConfig);
+        if (CollectionUtils.isEmpty(this.targetRunnerConfigListeners)) {
+            return;
+        }
+        for (TargetRunnerListener listener : this.targetRunnerConfigListeners) {
+            listener.onAddTargetRunner(targetRunnerConfig);
+        }
+    }
+
+    /**
+     * Call when the old target runner updated.
+     *
+     * @param targetRunnerConfig
+     */
+    void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+        this.targetRunnerConfigs.add(targetRunnerConfig);
+        if (CollectionUtils.isEmpty(this.targetRunnerConfigListeners)) {
+            return;
+        }
+        for (TargetRunnerListener listener : this.targetRunnerConfigListeners) {
+            listener.onUpdateTargetRunner(targetRunnerConfig);
+        }
+    }
+
+    /**
+     * Call when the old target runner deleted from runtimer.
+     *
+     * @param targetRunnerConfig
+     */
+    void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+        this.targetRunnerConfigs.remove(targetRunnerConfig);
+        if (CollectionUtils.isEmpty(this.targetRunnerConfigListeners)) {
+            return;
+        }
+        for (TargetRunnerListener listener : this.targetRunnerConfigListeners) {
+            listener.onDeleteTargetRunner(targetRunnerConfig);
+        }
+    }
+
+    public Map<String, List<TargetKeyValue>> getTaskConfigs() {
+        return null;
+    }
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageServiceImpl.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageServiceImpl.java
deleted file mode 100644
index 58a66fd..0000000
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageServiceImpl.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
-
-import com.google.common.collect.Lists;
-import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.component.connector.Connector;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.FilePathConfigUtil;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.store.FileBaseKeyValueStore;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.store.KeyValueStore;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.converter.JsonConverter;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.converter.ListConverter;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Service;
-import org.springframework.util.CollectionUtils;
-
-import javax.annotation.PostConstruct;
-import java.util.*;
-import java.util.concurrent.CopyOnWriteArraySet;
-
-@Service
-public class PusherConfigManageServiceImpl implements PusherConfigManageService {
-
-    /**
-     * plugin for recognize class loader
-     */
-    private Plugin plugin;
-
-    /**
-     * Current connector configs in the store.
-     */
-    private KeyValueStore<String, TargetKeyValue> connectorKeyValueStore;
-
-    /**
-     * Current task configs in the store.
-     */
-    private KeyValueStore<String, List<TargetKeyValue>> taskKeyValueStore;
-
-    /**
-     * All listeners to trigger while config change.
-     */
-    private Set<TargetConfigUpdateListener> targetConfigUpdateListeners;
-
-    private Set<String> connectTopicNames;
-
-    @Value("${runtimer.storePathRootDir:}")
-    private String storeRootPath;
-
-    public PusherConfigManageServiceImpl(Plugin plugin){
-        this.plugin = plugin;
-        this.connectTopicNames = new CopyOnWriteArraySet<>();
-        this.targetConfigUpdateListeners = new HashSet<>();
-    }
-
-    @PostConstruct
-    public void initStoreKeyValue(){
-        this.connectorKeyValueStore = new FileBaseKeyValueStore<>(
-                FilePathConfigUtil.getConnectorConfigPath(this.storeRootPath),
-                new JsonConverter(),
-                new JsonConverter(TargetKeyValue.class));
-        this.taskKeyValueStore = new FileBaseKeyValueStore<>(
-                FilePathConfigUtil.getTaskConfigPath(this.storeRootPath),
-                new JsonConverter(),
-                new ListConverter(TargetKeyValue.class));
-        this.connectorKeyValueStore.load();
-        this.taskKeyValueStore.load();
-    }
-
-    /**
-     * get all connector configs enabled
-     *
-     * @return
-     */
-    @Override
-    public Map<String, TargetKeyValue> getConnectorConfigs() {
-        Map<String, TargetKeyValue> result = new HashMap<>();
-        Map<String, TargetKeyValue> connectorConfigs = connectorKeyValueStore.getKVMap();
-        for (String connectorName : connectorConfigs.keySet()) {
-            TargetKeyValue config = connectorConfigs.get(connectorName);
-            if (0 != config.getInt(RuntimerConfigDefine.CONFIG_DELETED)) {
-                continue;
-            }
-            result.put(connectorName, config);
-        }
-        return result;
-    }
-
-    @Override
-    public String putConnectTargetConfig(String connectorName, TargetKeyValue configs) throws Exception {
-        TargetKeyValue exist = connectorKeyValueStore.get(connectorName);
-        if (null != exist) {
-            Long updateTimestamp = exist.getLong(RuntimerConfigDefine.UPDATE_TIMESTAMP);
-            if (null != updateTimestamp) {
-                configs.put(RuntimerConfigDefine.UPDATE_TIMESTAMP, updateTimestamp);
-            }
-        }
-        if (configs.equals(exist)) {
-            return "Connector with same config already exist.";
-        }
-
-        Long currentTimestamp = System.currentTimeMillis();
-        configs.put(RuntimerConfigDefine.UPDATE_TIMESTAMP, currentTimestamp);
-        for (String requireConfig : RuntimerConfigDefine.REQUEST_CONFIG) {
-            if (!configs.containsKey(requireConfig)) {
-                return "Request config key: " + requireConfig;
-            }
-        }
-
-        String connectorClass = configs.getString(RuntimerConfigDefine.CONNECTOR_CLASS);
-        ClassLoader classLoader = plugin.getPluginClassLoader(connectorClass);
-        Class clazz;
-        if (null != classLoader) {
-            clazz = Class.forName(connectorClass, true, classLoader);
-        } else {
-            clazz = Class.forName(connectorClass);
-        }
-        final Connector connector = (Connector) clazz.getDeclaredConstructor().newInstance();
-        connector.validate(configs);
-        connector.init(configs);
-        connectorKeyValueStore.put(connectorName, configs);
-        recomputeTaskConfigs(connectorName, connector, currentTimestamp, configs);
-        return "";
-    }
-
-    @Override
-    public void recomputeTaskConfigs(String connectorName, Connector connector, Long currentTimestamp, TargetKeyValue configs) {
-        int maxTask = configs.getInt(RuntimerConfigDefine.MAX_TASK, 1);
-        List<KeyValue> taskConfigs = connector.taskConfigs(maxTask);
-        List<TargetKeyValue> converterdConfigs = new ArrayList<>();
-        for (KeyValue keyValue : taskConfigs) {
-            TargetKeyValue newKeyValue = new TargetKeyValue();
-            for (String key : keyValue.keySet()) {
-                newKeyValue.put(key, keyValue.getString(key));
-            }
-            newKeyValue.put(RuntimerConfigDefine.TASK_CLASS, connector.taskClass().getName());
-            newKeyValue.put(RuntimerConfigDefine.UPDATE_TIMESTAMP, currentTimestamp);
-
-            newKeyValue.put(RuntimerConfigDefine.CONNECT_TOPICNAME, configs.getString(RuntimerConfigDefine.CONNECT_TOPICNAME));
-            newKeyValue.put(RuntimerConfigDefine.CONNECT_TOPICNAMES, configs.getString(RuntimerConfigDefine.CONNECT_TOPICNAMES));
-            Set<String> connectConfigKeySet = configs.keySet();
-            for (String connectConfigKey : connectConfigKeySet) {
-                if (connectConfigKey.startsWith(RuntimerConfigDefine.TRANSFORMS)) {
-                    newKeyValue.put(connectConfigKey, configs.getString(connectConfigKey));
-                }
-            }
-            converterdConfigs.add(newKeyValue);
-            connectTopicNames.add(configs.getString(RuntimerConfigDefine.CONNECT_TOPICNAME));
-        }
-        putTaskConfigs(connectorName, converterdConfigs);
-    }
-
-    @Override
-    public void removeConnectorConfig(String connectorName) {
-        TargetKeyValue config = connectorKeyValueStore.get(connectorName);
-        if(Objects.isNull(config)){
-            return;
-        }
-        config.put(RuntimerConfigDefine.UPDATE_TIMESTAMP, System.currentTimeMillis());
-        config.put(RuntimerConfigDefine.CONFIG_DELETED, 1);
-        List<TargetKeyValue> taskConfigList = taskKeyValueStore.get(connectorName);
-        taskConfigList.add(config);
-        connectorKeyValueStore.put(connectorName, config);
-        putTaskConfigs(connectorName, taskConfigList);
-    }
-
-    @Override
-    public Map<String, List<TargetKeyValue>> getTaskConfigs() {
-        Map<String, List<TargetKeyValue>> result = new HashMap<>();
-        Map<String, List<TargetKeyValue>> taskConfigs = taskKeyValueStore.getKVMap();
-        Map<String, TargetKeyValue> filteredConnector = getConnectorConfigs();
-        for (String connectorName : taskConfigs.keySet()) {
-            if (!filteredConnector.containsKey(connectorName)) {
-                continue;
-            }
-            result.put(connectorName, taskConfigs.get(connectorName));
-        }
-        return result;
-    }
-
-    @Override
-    public Set<PusherTargetEntity> getTargetInfo() {
-        Set<PusherTargetEntity> result = new HashSet<>();
-        Map<String, List<TargetKeyValue>> taskConfigs = taskKeyValueStore.getKVMap();
-        Map<String, TargetKeyValue> filteredConnector = getConnectorConfigs();
-        for (String connectorName : taskConfigs.keySet()) {
-            if (!filteredConnector.containsKey(connectorName)) {
-                continue;
-            }
-            PusherTargetEntity targetEntity = new PusherTargetEntity();
-            targetEntity.setConnectName(connectorName);
-            targetEntity.setTargetKeyValues(taskConfigs.get(connectorName));
-            result.add(targetEntity);
-        }
-        return result;
-    }
-
-    @Override
-    public List<String> getConnectTopics(){
-        if(CollectionUtils.isEmpty(connectTopicNames)){
-            return Lists.newArrayList();
-        }
-        return Lists.newArrayList(connectTopicNames);
-    }
-
-    @Override
-    public void persist() {
-        this.connectorKeyValueStore.persist();
-        this.taskKeyValueStore.persist();
-    }
-
-    @Override
-    public void registerListener(TargetConfigUpdateListener listener) {
-        this.targetConfigUpdateListeners.add(listener);
-    }
-
-    /**
-     * put target task key config for update
-     * @param connectorName
-     * @param configs
-     */
-    private void putTaskConfigs(String connectorName, List<TargetKeyValue> configs) {
-        List<TargetKeyValue> exist = taskKeyValueStore.get(connectorName);
-        if (null != exist && exist.size() > 0) {
-            taskKeyValueStore.remove(connectorName);
-        }
-        taskKeyValueStore.put(connectorName, configs);
-        PusherTargetEntity targetEntity = new PusherTargetEntity();
-        targetEntity.setConnectName(connectorName);
-        targetEntity.setTargetKeyValues(configs);
-        triggerListener(targetEntity);
-        persistStore();
-    }
-
-    private void persistStore() {
-
-    }
-
-    /**
-     * trigger new target task config for update
-     * @param pusherTargetEntity
-     */
-    private void triggerListener(PusherTargetEntity pusherTargetEntity) {
-        if (CollectionUtils.isEmpty(this.targetConfigUpdateListeners)) {
-            return;
-        }
-
-        for (TargetConfigUpdateListener listener : this.targetConfigUpdateListeners) {
-            listener.onConfigUpdate(pusherTargetEntity);
-        }
-    }
-
-}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageService.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigObserver.java
similarity index 50%
rename from adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageService.java
rename to adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigObserver.java
index 88a2001..c6c7670 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageService.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigObserver.java
@@ -17,82 +17,31 @@
 
 package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
 
-import io.openmessaging.connector.api.component.connector.Connector;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
-
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerListener;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
 
 /**
  * manage the pusher connector/task config info
  */
-public interface PusherConfigManageService {
-
-    /**
-     * Get all connector configs
-     *
-     * @return
-     */
-    Map<String, TargetKeyValue> getConnectorConfigs();
-
-    /**
-     * Put the configs.
-     *
-     * @param connectorName
-     * @param configs
-     * @return
-     * @throws Exception
-     */
-    String putConnectTargetConfig(String connectorName, TargetKeyValue configs) throws Exception;
-
-    /**
-     * Remove the connector
-     *
-     * @param connectorName
-     */
-    void removeConnectorConfig(String connectorName);
-
-    void recomputeTaskConfigs(String connectorName, Connector connector, Long currentTimestamp, TargetKeyValue configs);
+public interface TargetRunnerConfigObserver {
 
     /**
-     * Get all Task configs.
-     *
+     * Get the target runner config of runtimer.
      * @return
      */
-    Map<String, List<TargetKeyValue>> getTaskConfigs();
-
-    /**
-     * get target info
-     * @return
-     */
-    Set<PusherTargetEntity> getTargetInfo();
-
-    /**
-     * Get all topics
-     * @return
-     */
-    List<String> getConnectTopics();
-
-    /**
-     * Persist all the configs in a store.
-     */
-    void persist();
+    Set<TargetRunnerConfig> getTargetRunnerConfig();
 
     /**
      * Register a listener to listen all config update operations.
      *
      * @param listener
      */
-    void registerListener(TargetConfigUpdateListener listener);
-
-    interface TargetConfigUpdateListener {
-
-        /**
-         * Invoke while connector config changed.
-         */
-        void onConfigUpdate(PusherTargetEntity targetEntity);
-    }
+    void registerListener(TargetRunnerListener listener);
 
+    @Deprecated
+    Map<String, List<TargetKeyValue>> getTaskConfigs();
 }
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnControllerObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnControllerObserver.java
new file mode 100644
index 0000000..f51ac50
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnControllerObserver.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
+
+import com.google.common.collect.Sets;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+
+@Slf4j
+public class TargetRunnerConfigOnControllerObserver extends AbstractTargetRunnerConfigObserver {
+
+    @Override
+    public Set<TargetRunnerConfig> getLatestTargetRunnerConfig() {
+        return Sets.newHashSet();
+    }
+
+    public void add(TargetRunnerConfig targetRunnerConfig) {
+        super.onAddTargetRunner(targetRunnerConfig);
+    }
+
+    public void update(TargetRunnerConfig targetRunnerConfig) {
+        super.onUpdateTargetRunner(targetRunnerConfig);
+    }
+
+    public void delete(TargetRunnerConfig targetRunnerConfig) {
+        super.onDeleteTargetRunner(targetRunnerConfig);
+    }
+
+}
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnDBObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnDBObserver.java
new file mode 100644
index 0000000..3d847a1
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnDBObserver.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
+
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
+
+@Slf4j
+public class TargetRunnerConfigOnDBObserver extends AbstractTargetRunnerConfigObserver {
+
+    private static ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(
+        ThreadUtils.newThreadFactory("TargetRunnerConfigOnDBObserver", false));
+
+    public TargetRunnerConfigOnDBObserver(Plugin plugin) {
+        this.addListen(this);
+    }
+
+    @Override
+    public Set<TargetRunnerConfig> getLatestTargetRunnerConfig() {
+        return null;
+    }
+
+    public void addListen(
+        TargetRunnerConfigOnDBObserver pusherConfigOnFileService) {
+        service.scheduleAtFixedRate(() -> {
+            try {
+                Set<TargetRunnerConfig> latest = this.getLatestTargetRunnerConfig();
+                Set<TargetRunnerConfig> last = super.getTargetRunnerConfig();
+                TargetRunnerConfig changed = null;
+                super.onAddTargetRunner(changed);
+                super.onUpdateTargetRunner(changed);
+                super.onDeleteTargetRunner(changed);
+            } catch (Throwable e) {
+                log.error("Watch failed.", e);
+            }
+        }, 0, 3, TimeUnit.SECONDS);
+    }
+
+}
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
new file mode 100644
index 0000000..27269b8
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
+
+import java.nio.file.FileSystems;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardWatchEventKinds;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
+import org.springframework.beans.factory.annotation.Value;
+
+@Slf4j
+public class TargetRunnerConfigOnFileObserver extends AbstractTargetRunnerConfigObserver {
+
+    @Value("${runtimer.storePathRootDir:}")
+    private String storeRootPath;
+
+    @Value("${runtimer.store.targetRunner.config:targetRunner-config}")
+    private String fileName;
+
+    private static ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(
+        ThreadUtils.newThreadFactory("TargetRunnerConfigOnFileObserver", false));
+
+    public TargetRunnerConfigOnFileObserver(Plugin plugin) {
+        this.addListen(storeRootPath, fileName, this);
+    }
+
+    @Override
+    public Set<TargetRunnerConfig> getLatestTargetRunnerConfig() {
+        return null;
+    }
+
+    public void addListen(String pathStr, String fileName,
+        TargetRunnerConfigOnFileObserver pusherConfigOnFileService) {
+        log.info("Watching task file changing:{}", pathStr + fileName);
+        service.scheduleAtFixedRate(() -> {
+            try {
+                WatchService watchService = FileSystems.getDefault()
+                    .newWatchService();
+                Path path = Paths.get(pathStr);
+                path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE,
+                    StandardWatchEventKinds.ENTRY_MODIFY);
+                WatchKey watchKey;
+                while (true) {
+                    watchKey = watchService.take();
+                    if (watchKey != null && !watchKey.pollEvents()
+                        .isEmpty()) {
+                        log.info("Watched the file changed events.");
+                        pusherConfigOnFileService.diff();
+                    }
+                    watchKey.reset();
+                }
+            } catch (Throwable e) {
+                log.error("Watch file failed.", e);
+            }
+        }, 0, 3, TimeUnit.SECONDS);
+    }
+
+    public void diff() {
+        Set<TargetRunnerConfig> latest = this.getLatestTargetRunnerConfig();
+        Set<TargetRunnerConfig> last = super.getTargetRunnerConfig();
+        TargetRunnerConfig changed = null;
+        super.onAddTargetRunner(changed);
+        super.onUpdateTargetRunner(changed);
+        super.onDeleteTargetRunner(changed);
+    }
+
+}
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/resources/target-runner.config b/adapter/runtimer/src/main/resources/target-runner.config
new file mode 100644
index 0000000..04458c6
--- /dev/null
+++ b/adapter/runtimer/src/main/resources/target-runner.config
@@ -0,0 +1,7 @@
+[
+    {
+    },
+    {
+
+    }
+]
\ No newline at end of file