You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by sh...@apache.org on 2023/03/15 09:52:05 UTC

[rocketmq-eventbridge] branch shenlin/runtimer created (now 2a24698)

This is an automated email from the ASF dual-hosted git repository.

shenlin pushed a change to branch shenlin/runtimer
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git


      at 2a24698  feat:add TargetRunnerConfigObserver

This branch includes the following new commits:

     new 2a24698  feat:add TargetRunnerConfigObserver

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[rocketmq-eventbridge] 01/01: feat:add TargetRunnerConfigObserver

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shenlin pushed a commit to branch shenlin/runtimer
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git

commit 2a24698066e51ca6bdc74625a09228339d162ea1
Author: 2011shenlin <20...@gmail.com>
AuthorDate: Wed Mar 15 17:51:40 2023 +0800

    feat:add TargetRunnerConfigObserver
---
 .../RocketMQConnectTargetRunnerAPIImpl.java        |  40 +--
 .../eventbridge/adapter/runtimer/Runtimer.java     |  24 +-
 .../adapter/runtimer/boot/EventBusListener.java    |  80 +++---
 .../adapter/runtimer/boot/EventRuleTransfer.java   |  82 ++++---
 .../adapter/runtimer/boot/EventTargetPusher.java   |  73 +++---
 .../runtimer/boot/listener/ListenerFactory.java    |   6 +-
 .../boot/listener/TargetRunnerListener.java        |  44 ++++
 ...erTargetEntity.java => TargetRunnerConfig.java} |   4 +-
 .../AbstractTargetRunnerConfigObserver.java        |  95 ++++++++
 .../service/PusherConfigManageServiceImpl.java     | 271 ---------------------
 ...ervice.java => TargetRunnerConfigObserver.java} |  69 +-----
 .../TargetRunnerConfigOnControllerObserver.java    |  48 ++++
 .../service/TargetRunnerConfigOnDBObserver.java    |  60 +++++
 .../service/TargetRunnerConfigOnFileObserver.java  |  92 +++++++
 14 files changed, 517 insertions(+), 471 deletions(-)

diff --git a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/RocketMQConnectTargetRunnerAPIImpl.java b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/RocketMQConnectTargetRunnerAPIImpl.java
index fda982c..8971028 100644
--- a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/RocketMQConnectTargetRunnerAPIImpl.java
+++ b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/RocketMQConnectTargetRunnerAPIImpl.java
@@ -20,11 +20,8 @@ package org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect;
 import com.alibaba.fastjson.JSON;
 import com.google.common.collect.Lists;
 import com.google.gson.Gson;
-
 import java.util.ArrayList;
 import java.util.Map;
-import java.util.Objects;
-
 import lombok.SneakyThrows;
 import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.context.RocketMQConnectSourceRunnerContext;
 import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.context.RocketMQConnectTargetRunnerContext;
@@ -32,7 +29,6 @@ import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.dto.ActionStatus
 import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.dto.CreateSinkConnectorRequest;
 import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.dto.TransformRequest;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService;
 import org.apache.rocketmq.eventbridge.domain.common.enums.EventTargetStatusEnum;
 import org.apache.rocketmq.eventbridge.domain.model.Component;
 import org.apache.rocketmq.eventbridge.domain.model.run.RunOptions;
@@ -45,13 +41,10 @@ public class RocketMQConnectTargetRunnerAPIImpl extends RocketMQConverter implem
 
     private final RocketMQConnectClient rocketMQConnectClient;
 
-    private PusherConfigManageService pusherConfigManageService;
-
     public RocketMQConnectTargetRunnerAPIImpl(EventDataRepository eventDataRepository,
-        RocketMQConnectClient rocketMQConnectClient, PusherConfigManageService pusherConfigManageService) {
+        RocketMQConnectClient rocketMQConnectClient) {
         super(eventDataRepository);
         this.rocketMQConnectClient = rocketMQConnectClient;
-        this.pusherConfigManageService = pusherConfigManageService;
     }
 
     @SneakyThrows
@@ -64,14 +57,9 @@ public class RocketMQConnectTargetRunnerAPIImpl extends RocketMQConverter implem
         TransformRequest filterTransform = this.buildEventBridgeFilterTransform(filterPattern);
         TransformRequest eventBridgeTransform = this.buildEventBridgeTransform(targetTransform);
         TargetKeyValue targetKeyValue = initSinkTaskConfig(name, topicName, sinkConnectorClass,
-                sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
-        if(Objects.nonNull(pusherConfigManageService)){
-            pusherConfigManageService.putConnectTargetConfig(name, targetKeyValue);
-        }else {
-            // todo delete
-            rocketMQConnectClient.createSinkConnector(name, topicName, sinkConnectorClass,
-                    sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
-        }
+            sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
+        rocketMQConnectClient.createSinkConnector(name, topicName, sinkConnectorClass,
+            sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
         RocketMQConnectTargetRunnerContext context = new RocketMQConnectTargetRunnerContext(name, JSON.toJSONString(targetKeyValue));
         return new Gson().toJson(context);
     }
@@ -87,17 +75,11 @@ public class RocketMQConnectTargetRunnerAPIImpl extends RocketMQConverter implem
         TransformRequest eventBridgeTransform = this.buildEventBridgeTransform(targetTransform);
         //create
         TargetKeyValue targetKeyValue = initSinkTaskConfig(name, topicName, sinkConnectorClass,
-                sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
-        if(Objects.nonNull(pusherConfigManageService)){
-            pusherConfigManageService.putConnectTargetConfig(name, targetKeyValue);
-        }else {
-            // todo delete
-            //stop
-            this.delete(runContext);
-
-            rocketMQConnectClient.createSinkConnector(name, topicName, sinkConnectorClass,
-                    sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
-        }
+            sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
+        //stop
+        this.delete(runContext);
+        rocketMQConnectClient.createSinkConnector(name, topicName, sinkConnectorClass,
+            sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
         RocketMQConnectTargetRunnerContext context = new RocketMQConnectTargetRunnerContext(name, JSON.toJSONString(targetKeyValue));
         return new Gson().toJson(context);
     }
@@ -133,6 +115,7 @@ public class RocketMQConnectTargetRunnerAPIImpl extends RocketMQConverter implem
 
     /**
      * init sink task config
+     *
      * @param name
      * @param topicName
      * @param sinkClass
@@ -140,7 +123,8 @@ public class RocketMQConnectTargetRunnerAPIImpl extends RocketMQConverter implem
      * @param transforms
      * @return
      */
-    private TargetKeyValue initSinkTaskConfig(String name, String topicName, String sinkClass, Map<String, Object> sinkConfig, ArrayList<TransformRequest> transforms) {
+    private TargetKeyValue initSinkTaskConfig(String name, String topicName, String sinkClass,
+        Map<String, Object> sinkConfig, ArrayList<TransformRequest> transforms) {
         CreateSinkConnectorRequest request = new CreateSinkConnectorRequest();
         request.setName(name);
         request.setTopicName(topicName);
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
index 896885d..44745d2 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
@@ -25,7 +25,7 @@ import org.apache.rocketmq.eventbridge.adapter.runtimer.common.RuntimerState;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigObserver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
@@ -55,7 +55,7 @@ public class Runtimer extends ServiceThread{
 
     private ListenerFactory listenerFactory;
 
-    private PusherConfigManageService pusherConfigManageService;
+    private TargetRunnerConfigObserver targetRunnerConfigObserver;
 
     private Map<String, List<TargetKeyValue>> taskConfigs = new HashMap<>();
 
@@ -67,21 +67,21 @@ public class Runtimer extends ServiceThread{
 
     private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor((Runnable r) -> new Thread(r, "RuntimerScheduledThread"));
 
-    public Runtimer(Plugin plugin, ListenerFactory listenerFactory, PusherConfigManageService configManageService) {
+    public Runtimer(Plugin plugin, ListenerFactory listenerFactory, TargetRunnerConfigObserver configManageService) {
         this.plugin = plugin;
         this.listenerFactory = listenerFactory;
-        this.pusherConfigManageService = configManageService;
+        this.targetRunnerConfigObserver = configManageService;
     }
 
     @PostConstruct
     public void initAndStart() {
         logger.info("init runtimer task config");
-        this.taskConfigs = pusherConfigManageService.getTaskConfigs();
-        listener = new EventBusListener(listenerFactory, pusherConfigManageService);
+        this.taskConfigs = targetRunnerConfigObserver.getTaskConfigs();
+        listener = new EventBusListener(listenerFactory, targetRunnerConfigObserver);
         listener.initOrUpdateListenConsumer(taskConfigs);
-        transfer = new EventRuleTransfer(plugin, listenerFactory, pusherConfigManageService);
+        transfer = new EventRuleTransfer(plugin, listenerFactory, targetRunnerConfigObserver);
         transfer.initOrUpdateTaskTransform(taskConfigs);
-        pusher = new EventTargetPusher(plugin, listenerFactory, pusherConfigManageService);
+        pusher = new EventTargetPusher(plugin, listenerFactory, targetRunnerConfigObserver);
         pusher.initOrUpdatePusherTask(taskConfigs);
         startRuntimer();
     }
@@ -105,13 +105,5 @@ public class Runtimer extends ServiceThread{
 
         pusher.start();
 
-        scheduledExecutorService.scheduleAtFixedRate(() -> {
-            try {
-                this.pusherConfigManageService.persist();
-            } catch (Exception e) {
-                logger.error("schedule persist config error.", e);
-            }
-        }, 500, 500, TimeUnit.MILLISECONDS);
-
     }
 }
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
index 0c40235..8f3c707 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
@@ -25,31 +25,43 @@ import io.openmessaging.connector.api.data.RecordOffset;
 import io.openmessaging.connector.api.data.RecordPartition;
 import io.openmessaging.connector.api.data.Schema;
 import io.openmessaging.internal.DefaultKeyValue;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerListener;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.QueueState;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigObserver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.CollectionUtils;
 
-import java.nio.charset.StandardCharsets;
-import java.util.*;
-import java.util.concurrent.*;
-
 /**
  * listen the event and offer to queue
+ *
  * @author artisan
  */
-public class EventBusListener extends ServiceThread {
+public class EventBusListener extends ServiceThread implements TargetRunnerListener {
 
     private Logger logger = LoggerFactory.getLogger(EventBusListener.class);
 
@@ -63,32 +75,33 @@ public class EventBusListener extends ServiceThread {
 
     private ListenerFactory listenerFactory;
 
-    private PusherConfigManageService pusherConfigManageService;
+    private TargetRunnerConfigObserver targetRunnerConfigObserver;
 
-    private ExecutorService executorService = new ThreadPoolExecutor(20,60, 1000,TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(100));
+    private ExecutorService executorService = new ThreadPoolExecutor(20, 60, 1000, TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(100));
 
     private BlockingQueue<MessageExt> eventMessage = new LinkedBlockingQueue(50000);
 
-    public EventBusListener(ListenerFactory listenerFactory, PusherConfigManageService pusherConfigManageService){
+    public EventBusListener(ListenerFactory listenerFactory, TargetRunnerConfigObserver targetRunnerConfigObserver) {
         this.messageQueuesOffsetMap = new ConcurrentHashMap<>(256);
         this.messageQueuesStateMap = new ConcurrentHashMap<>(256);
         this.listenerFactory = listenerFactory;
-        this.pusherConfigManageService = pusherConfigManageService;
-        this.pusherConfigManageService.registerListener(new ConsumerUpdateListenerImpl());
+        this.targetRunnerConfigObserver = targetRunnerConfigObserver;
+        this.targetRunnerConfigObserver.registerListener(this);
     }
 
     /**
      * init listen consumer
+     *
      * @param taskConfig
      */
-    public void initOrUpdateListenConsumer(Map<String, List<TargetKeyValue>> taskConfig){
-        if(MapUtils.isEmpty(taskConfig)){
+    public void initOrUpdateListenConsumer(Map<String, List<TargetKeyValue>> taskConfig) {
+        if (MapUtils.isEmpty(taskConfig)) {
             logger.warn("initListenConsumer by taskConfig param is empty");
             return;
         }
         List<TargetKeyValue> targetKeyValues = initTaskKeyInfo(taskConfig);
         this.topics.addAll(listenerFactory.parseTopicListByList(targetKeyValues));
-        for (String topic : topics){
+        for (String topic : topics) {
             DefaultLitePullConsumer pullConsumer = listenerFactory.initDefaultMQPullConsumer(topic);
             listenConsumer.add(pullConsumer);
         }
@@ -97,12 +110,13 @@ public class EventBusListener extends ServiceThread {
 
     /**
      * init all task config info
+     *
      * @param taskConfig
      * @return
      */
     private List<TargetKeyValue> initTaskKeyInfo(Map<String, List<TargetKeyValue>> taskConfig) {
         Set<TargetKeyValue> targetKeyValues = new HashSet<>();
-        for(String connectName : taskConfig.keySet()){
+        for (String connectName : taskConfig.keySet()) {
             targetKeyValues.addAll(taskConfig.get(connectName));
         }
         return Lists.newArrayList(targetKeyValues);
@@ -110,13 +124,13 @@ public class EventBusListener extends ServiceThread {
 
     @Override
     public void run() {
-        while (!stopped){
-            if(CollectionUtils.isEmpty(listenConsumer)){
+        while (!stopped) {
+            if (CollectionUtils.isEmpty(listenConsumer)) {
                 logger.info("current listen consumer is empty");
                 this.waitForRunning(1000);
                 continue;
             }
-            for(DefaultLitePullConsumer pullConsumer : listenConsumer) {
+            for (DefaultLitePullConsumer pullConsumer : listenConsumer) {
                 executorService.submit(() -> {
                     try {
                         List<MessageExt> messageExts = pullConsumer.poll(3000);
@@ -144,6 +158,7 @@ public class EventBusListener extends ServiceThread {
 
     /**
      * MessageExt convert to connect record
+     *
      * @param messageExt
      * @return
      */
@@ -175,14 +190,21 @@ public class EventBusListener extends ServiceThread {
     /**
      * consumer update listener
      */
-    class ConsumerUpdateListenerImpl implements PusherConfigManageService.TargetConfigUpdateListener {
-
-        @Override
-        public void onConfigUpdate(PusherTargetEntity targetEntity) {
-            logger.info("consumer update by new target config changed, target info -{}", JSON.toJSONString(targetEntity));
-            Map<String, List<TargetKeyValue>> lastTargetMap = new HashMap<>();
-            lastTargetMap.put(targetEntity.getConnectName(), targetEntity.getTargetKeyValues());
-            initOrUpdateListenConsumer(lastTargetMap);
-        }
+    @Override
+    public void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+        logger.info("consumer update by new target config changed, target info -{}", JSON.toJSONString(targetRunnerConfig));
+        Map<String, List<TargetKeyValue>> lastTargetMap = new HashMap<>();
+        lastTargetMap.put(targetRunnerConfig.getConnectName(), targetRunnerConfig.getTargetKeyValues());
+        initOrUpdateListenConsumer(lastTargetMap);
     }
+
+    @Override
+    public void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+    }
+
+    @Override
+    public void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+
+    }
+
 }
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
index f14fdec..be73b2a 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
@@ -19,44 +19,54 @@ package org.apache.rocketmq.eventbridge.adapter.runtimer.boot;
 
 import com.alibaba.fastjson.JSON;
 import io.openmessaging.connector.api.data.ConnectRecord;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerListener;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.transfer.TransformEngine;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigObserver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
-import java.util.concurrent.*;
-
 /**
  * receive event and transfer the rule to pusher
  */
-public class EventRuleTransfer extends ServiceThread {
+public class EventRuleTransfer extends ServiceThread implements TargetRunnerListener {
 
     private static final Logger logger = LoggerFactory.getLogger(EventRuleTransfer.class);
 
     private ListenerFactory listenerFactory;
 
-    private PusherConfigManageService pusherConfigManageService;
+    private TargetRunnerConfigObserver targetRunnerConfigObserver;
 
     private Plugin plugin;
 
     Map<TargetKeyValue/*taskConfig*/, TransformEngine<ConnectRecord>/*taskTransform*/> taskTransformMap = new ConcurrentHashMap<>(20);
 
-    private ExecutorService executorService = new ThreadPoolExecutor(20,60, 1000,TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(100));
+    private ExecutorService executorService = new ThreadPoolExecutor(20, 60, 1000, TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(100));
 
-    public EventRuleTransfer(Plugin plugin, ListenerFactory listenerFactory, PusherConfigManageService pusherConfigManageService){
+    public EventRuleTransfer(Plugin plugin, ListenerFactory listenerFactory,
+        TargetRunnerConfigObserver targetRunnerConfigObserver) {
         this.plugin = plugin;
         this.listenerFactory = listenerFactory;
-        this.pusherConfigManageService = pusherConfigManageService;
-        this.pusherConfigManageService.registerListener(new TransformUpdateListenerImpl());
+        this.targetRunnerConfigObserver = targetRunnerConfigObserver;
+        this.targetRunnerConfigObserver.registerListener(this);
     }
 
-    public void initOrUpdateTaskTransform(Map<String, List<TargetKeyValue>> taskConfig){
+    public void initOrUpdateTaskTransform(Map<String, List<TargetKeyValue>> taskConfig) {
         this.taskTransformMap.putAll(initSinkTaskTransformInfo(taskConfig));
     }
 
@@ -67,9 +77,9 @@ public class EventRuleTransfer extends ServiceThread {
 
     @Override
     public void run() {
-        while (!stopped){
+        while (!stopped) {
             ConnectRecord eventRecord = listenerFactory.takeEventRecord();
-            if(Objects.isNull(eventRecord)){
+            if (Objects.isNull(eventRecord)) {
                 logger.info("listen eventRecord is empty, continue by curTime - {}", System.currentTimeMillis());
                 this.waitForRunning(1000);
                 continue;
@@ -77,16 +87,16 @@ public class EventRuleTransfer extends ServiceThread {
             executorService.submit(() -> {
                 // extension add sub
                 // rule - target
-                for (TargetKeyValue targetKeyValue : taskTransformMap.keySet()){
+                for (TargetKeyValue targetKeyValue : taskTransformMap.keySet()) {
                     // add threadPool for cup task
                     // attention coreSize
                     TransformEngine<ConnectRecord> transformEngine = taskTransformMap.get(targetKeyValue);
                     ConnectRecord transformRecord = transformEngine.doTransforms(eventRecord);
-                    if(Objects.isNull(transformRecord)){
+                    if (Objects.isNull(transformRecord)) {
                         continue;
                     }
                     // a bean for maintain
-                    Map<TargetKeyValue,ConnectRecord> targetMap = new HashMap<>();
+                    Map<TargetKeyValue, ConnectRecord> targetMap = new HashMap<>();
                     targetMap.put(targetKeyValue, transformRecord);
                     listenerFactory.offerTargetTaskQueue(targetMap);
 
@@ -101,20 +111,20 @@ public class EventRuleTransfer extends ServiceThread {
     }
 
     /**
-     * Init sink task transform map
-     * key: task config
-     * value: transformEngine
+     * Init sink task transform map key: task config value: transformEngine
+     *
      * @param taskConfig
      * @return
      */
-    private Map<TargetKeyValue, TransformEngine<ConnectRecord>> initSinkTaskTransformInfo(Map<String, List<TargetKeyValue>> taskConfig) {
+    private Map<TargetKeyValue, TransformEngine<ConnectRecord>> initSinkTaskTransformInfo(
+        Map<String, List<TargetKeyValue>> taskConfig) {
         Map<TargetKeyValue, TransformEngine<ConnectRecord>> curTaskTransformMap = new HashMap<>();
         Set<TargetKeyValue> allTaskKeySet = new HashSet<>();
-        for(String connectName : taskConfig.keySet()){
+        for (String connectName : taskConfig.keySet()) {
             List<TargetKeyValue> taskKeyList = taskConfig.get(connectName);
             allTaskKeySet.addAll(new HashSet<>(taskKeyList));
         }
-        for(TargetKeyValue keyValue : allTaskKeySet){
+        for (TargetKeyValue keyValue : allTaskKeySet) {
             TransformEngine<ConnectRecord> transformChain = new TransformEngine<>(keyValue, plugin);
             curTaskTransformMap.put(keyValue, transformChain);
         }
@@ -125,14 +135,20 @@ public class EventRuleTransfer extends ServiceThread {
     /**
      * transform update listener
      */
-    class TransformUpdateListenerImpl implements PusherConfigManageService.TargetConfigUpdateListener {
-
-        @Override
-        public void onConfigUpdate(PusherTargetEntity targetEntity) {
-            logger.info("transform update by new target config changed, target info -{}", JSON.toJSONString(targetEntity));
-            Map<String, List<TargetKeyValue>> lastTargetMap = new HashMap<>();
-            lastTargetMap.put(targetEntity.getConnectName(), targetEntity.getTargetKeyValues());
-            initOrUpdateTaskTransform(lastTargetMap);
-        }
+    @Override
+    public void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+        logger.info("transform update by new target config changed, target info -{}", JSON.toJSONString(targetRunnerConfig));
+        Map<String, List<TargetKeyValue>> lastTargetMap = new HashMap<>();
+        lastTargetMap.put(targetRunnerConfig.getConnectName(), targetRunnerConfig.getTargetKeyValues());
+        initOrUpdateTaskTransform(lastTargetMap);
+    }
+
+    @Override
+    public void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+    }
+
+    @Override
+    public void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+
     }
 }
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
index d324fe4..bd9ddb0 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
@@ -22,27 +22,32 @@ import com.google.common.collect.Lists;
 import io.netty.util.internal.ConcurrentSet;
 import io.openmessaging.connector.api.component.task.sink.SinkTask;
 import io.openmessaging.connector.api.data.ConnectRecord;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
 import org.apache.commons.collections.MapUtils;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerListener;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.pusher.PusherTaskContext;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.PluginClassLoader;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigObserver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
-import java.util.concurrent.CopyOnWriteArrayList;
-
 /**
  * event target push to sink task
+ *
  * @author artisan
  */
-public class EventTargetPusher extends ServiceThread {
+public class EventTargetPusher extends ServiceThread implements TargetRunnerListener {
 
     private static final Logger logger = LoggerFactory.getLogger(EventTargetPusher.class);
 
@@ -56,29 +61,31 @@ public class EventTargetPusher extends ServiceThread {
 
     private ListenerFactory listenerFactory;
 
-    private PusherConfigManageService pusherConfigManageService;
+    private TargetRunnerConfigObserver targetRunnerConfigObserver;
 
     private List<SinkTask> pusherTasks = new CopyOnWriteArrayList<>();
 
-    public EventTargetPusher(Plugin plugin, ListenerFactory listenerFactory, PusherConfigManageService pusherConfigManageService){
+    public EventTargetPusher(Plugin plugin, ListenerFactory listenerFactory,
+        TargetRunnerConfigObserver targetRunnerConfigObserver) {
         this.plugin = plugin;
         this.listenerFactory = listenerFactory;
-        this.pusherConfigManageService = pusherConfigManageService;
-        this.pusherConfigManageService.registerListener(new TargetUpdateListenerImpl());
+        this.targetRunnerConfigObserver = targetRunnerConfigObserver;
+        this.targetRunnerConfigObserver.registerListener(this);
     }
 
     /**
      * init running tasks
+     *
      * @param taskConfig
      */
-    public void initOrUpdatePusherTask(Map<String, List<TargetKeyValue>> taskConfig){
+    public void initOrUpdatePusherTask(Map<String, List<TargetKeyValue>> taskConfig) {
         Set<TargetKeyValue> taskProperty = new HashSet<>();
-        for(String connectName : taskConfig.keySet()){
+        for (String connectName : taskConfig.keySet()) {
             List<TargetKeyValue> targetKeyValues = taskConfig.get(connectName);
             taskProperty.addAll(new HashSet<>(targetKeyValues));
         }
-        for(TargetKeyValue targetKeyValue : taskProperty){
-            try{
+        for (TargetKeyValue targetKeyValue : taskProperty) {
+            try {
                 String taskClass = targetKeyValue.getString(RuntimerConfigDefine.TASK_CLASS);
                 ClassLoader loader = plugin.getPluginClassLoader(taskClass);
                 Class taskClazz;
@@ -98,7 +105,7 @@ public class EventTargetPusher extends ServiceThread {
                     Plugin.compareAndSwapLoaders(loader);
                 }
                 logger.info("init target task succeed, target key - {}", JSON.toJSONString(targetKeyValue));
-            }catch (Exception exception){
+            } catch (Exception exception) {
                 exception.printStackTrace();
             }
         }
@@ -106,9 +113,9 @@ public class EventTargetPusher extends ServiceThread {
 
     @Override
     public void run() {
-        while (!stopped){
+        while (!stopped) {
             Map<TargetKeyValue, ConnectRecord> taskPusher = listenerFactory.takeTargetMap();
-            if(MapUtils.isEmpty(taskPusher)){
+            if (MapUtils.isEmpty(taskPusher)) {
                 logger.info("current target pusher is empty");
                 this.waitForRunning(1000);
                 continue;
@@ -121,8 +128,8 @@ public class EventTargetPusher extends ServiceThread {
             // also add in ConnectRecord class system property
             String taskPushName = targetKeyValue.getString(RuntimerConfigDefine.TASK_CLASS);
             // add thread pool
-            for(SinkTask sinkTask : pusherTasks){
-                if(sinkTask.getClass().getName().equals(taskPushName)){
+            for (SinkTask sinkTask : pusherTasks) {
+                if (sinkTask.getClass().getName().equals(taskPushName)) {
                     sinkTask.put(Lists.newArrayList(taskPusher.get(targetKeyValue)));
                 }
             }
@@ -137,14 +144,22 @@ public class EventTargetPusher extends ServiceThread {
     /**
      * target update listener
      */
-    class TargetUpdateListenerImpl implements PusherConfigManageService.TargetConfigUpdateListener {
-
-        @Override
-        public void onConfigUpdate(PusherTargetEntity targetEntity) {
-            logger.info("transform update by new target config changed, target info -{}", JSON.toJSONString(targetEntity));
-            Map<String, List<TargetKeyValue>> lastTargetMap = new HashMap<>();
-            lastTargetMap.put(targetEntity.getConnectName(), targetEntity.getTargetKeyValues());
-            initOrUpdatePusherTask(lastTargetMap);
-        }
+
+    @Override
+    public void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+        logger.info("transform update by new target config changed, target info -{}", JSON.toJSONString(targetRunnerConfig));
+        Map<String, List<TargetKeyValue>> lastTargetMap = new HashMap<>();
+        lastTargetMap.put(targetRunnerConfig.getConnectName(), targetRunnerConfig.getTargetKeyValues());
+        initOrUpdatePusherTask(lastTargetMap);
     }
+
+    @Override
+    public void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+    }
+
+    @Override
+    public void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+
+    }
+
 }
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java
index 07f010e..44c29bb 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java
@@ -28,7 +28,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.LoggerName;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
@@ -52,7 +52,7 @@ public class ListenerFactory {
 
     public static final String QUEUE_OFFSET = "queueOffset";
 
-    private BlockingQueue<PusherTargetEntity> pusherTargetQueue = new LinkedBlockingQueue<>(1000);
+    private BlockingQueue<TargetRunnerConfig> pusherTargetQueue = new LinkedBlockingQueue<>(1000);
 
     private BlockingQueue<MessageExt> eventMessage = new LinkedBlockingQueue(50000);
 
@@ -76,7 +76,7 @@ public class ListenerFactory {
         return consumer;
     }
 
-    public PusherTargetEntity takeTaskConfig(){
+    public TargetRunnerConfig takeTaskConfig(){
         if(pusherTargetQueue.isEmpty()){
             return null;
         }
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/TargetRunnerListener.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/TargetRunnerListener.java
new file mode 100644
index 0000000..f528b7b
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/TargetRunnerListener.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener;
+
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+
+public interface TargetRunnerListener {
+
+    /**
+     * Call when add new target runner to runtimer.
+     *
+     * @param targetRunnerConfig
+     */
+    void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig);
+
+    /**
+     * Call when the old target runner updated.
+     *
+     * @param targetRunnerConfig
+     */
+    void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig);
+
+    /**
+     * Call when the old target runner deleted from runtimer.
+     *
+     * @param targetRunnerConfig
+     */
+    void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig);
+
+}
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/PusherTargetEntity.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
similarity index 92%
rename from adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/PusherTargetEntity.java
rename to adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
index dc7ced4..52525cf 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/PusherTargetEntity.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
@@ -26,7 +26,7 @@ import java.util.List;
  * pusher target key config
  */
 @Data
-public class PusherTargetEntity implements Serializable {
+public class TargetRunnerConfig implements Serializable {
 
     private String connectName;
 
@@ -35,7 +35,7 @@ public class PusherTargetEntity implements Serializable {
     @Override
     public boolean equals(Object object){
         if (object != null && object.getClass() == this.getClass()) {
-            PusherTargetEntity targetEntity = (PusherTargetEntity) object;
+            TargetRunnerConfig targetEntity = (TargetRunnerConfig) object;
             return this.connectName.equals(targetEntity.getConnectName())
                     && this.targetKeyValues.size() == targetEntity.getTargetKeyValues().size()
                     && this.targetKeyValues.containsAll(targetEntity.getTargetKeyValues());
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/AbstractTargetRunnerConfigObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/AbstractTargetRunnerConfigObserver.java
new file mode 100644
index 0000000..619bc63
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/AbstractTargetRunnerConfigObserver.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
+
+import com.google.common.collect.Sets;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerListener;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
+
+@Service
+public abstract class AbstractTargetRunnerConfigObserver implements TargetRunnerConfigObserver {
+
+    private Set<TargetRunnerConfig> targetRunnerConfigs = Sets.newHashSet();
+
+    /**
+     * All listeners to trigger while config change.
+     */
+    private Set<TargetRunnerListener> targetRunnerConfigListeners = new HashSet<>();
+
+    public Set<TargetRunnerConfig> getTargetRunnerConfig() {
+        return targetRunnerConfigs;
+    }
+
+    public abstract Set<TargetRunnerConfig> getLatestTargetRunnerConfig();
+
+    @Override
+    public void registerListener(TargetRunnerListener listener) {
+        this.targetRunnerConfigListeners.add(listener);
+    }
+
+    void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+        this.targetRunnerConfigs.add(targetRunnerConfig);
+        if (CollectionUtils.isEmpty(this.targetRunnerConfigListeners)) {
+            return;
+        }
+        for (TargetRunnerListener listener : this.targetRunnerConfigListeners) {
+            listener.onAddTargetRunner(targetRunnerConfig);
+        }
+    }
+
+    /**
+     * Call when the old target runner updated.
+     *
+     * @param targetRunnerConfig
+     */
+    void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+        this.targetRunnerConfigs.add(targetRunnerConfig);
+        if (CollectionUtils.isEmpty(this.targetRunnerConfigListeners)) {
+            return;
+        }
+        for (TargetRunnerListener listener : this.targetRunnerConfigListeners) {
+            listener.onUpdateTargetRunner(targetRunnerConfig);
+        }
+    }
+
+    /**
+     * Call when the old target runner deleted from runtimer.
+     *
+     * @param targetRunnerConfig
+     */
+    void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+        this.targetRunnerConfigs.remove(targetRunnerConfig);
+        if (CollectionUtils.isEmpty(this.targetRunnerConfigListeners)) {
+            return;
+        }
+        for (TargetRunnerListener listener : this.targetRunnerConfigListeners) {
+            listener.onDeleteTargetRunner(targetRunnerConfig);
+        }
+    }
+
+    public Map<String, List<TargetKeyValue>> getTaskConfigs() {
+        return null;
+    }
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageServiceImpl.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageServiceImpl.java
deleted file mode 100644
index 58a66fd..0000000
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageServiceImpl.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
-
-import com.google.common.collect.Lists;
-import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.component.connector.Connector;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.FilePathConfigUtil;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.store.FileBaseKeyValueStore;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.store.KeyValueStore;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.converter.JsonConverter;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.converter.ListConverter;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Service;
-import org.springframework.util.CollectionUtils;
-
-import javax.annotation.PostConstruct;
-import java.util.*;
-import java.util.concurrent.CopyOnWriteArraySet;
-
-@Service
-public class PusherConfigManageServiceImpl implements PusherConfigManageService {
-
-    /**
-     * plugin for recognize class loader
-     */
-    private Plugin plugin;
-
-    /**
-     * Current connector configs in the store.
-     */
-    private KeyValueStore<String, TargetKeyValue> connectorKeyValueStore;
-
-    /**
-     * Current task configs in the store.
-     */
-    private KeyValueStore<String, List<TargetKeyValue>> taskKeyValueStore;
-
-    /**
-     * All listeners to trigger while config change.
-     */
-    private Set<TargetConfigUpdateListener> targetConfigUpdateListeners;
-
-    private Set<String> connectTopicNames;
-
-    @Value("${runtimer.storePathRootDir:}")
-    private String storeRootPath;
-
-    public PusherConfigManageServiceImpl(Plugin plugin){
-        this.plugin = plugin;
-        this.connectTopicNames = new CopyOnWriteArraySet<>();
-        this.targetConfigUpdateListeners = new HashSet<>();
-    }
-
-    @PostConstruct
-    public void initStoreKeyValue(){
-        this.connectorKeyValueStore = new FileBaseKeyValueStore<>(
-                FilePathConfigUtil.getConnectorConfigPath(this.storeRootPath),
-                new JsonConverter(),
-                new JsonConverter(TargetKeyValue.class));
-        this.taskKeyValueStore = new FileBaseKeyValueStore<>(
-                FilePathConfigUtil.getTaskConfigPath(this.storeRootPath),
-                new JsonConverter(),
-                new ListConverter(TargetKeyValue.class));
-        this.connectorKeyValueStore.load();
-        this.taskKeyValueStore.load();
-    }
-
-    /**
-     * get all connector configs enabled
-     *
-     * @return
-     */
-    @Override
-    public Map<String, TargetKeyValue> getConnectorConfigs() {
-        Map<String, TargetKeyValue> result = new HashMap<>();
-        Map<String, TargetKeyValue> connectorConfigs = connectorKeyValueStore.getKVMap();
-        for (String connectorName : connectorConfigs.keySet()) {
-            TargetKeyValue config = connectorConfigs.get(connectorName);
-            if (0 != config.getInt(RuntimerConfigDefine.CONFIG_DELETED)) {
-                continue;
-            }
-            result.put(connectorName, config);
-        }
-        return result;
-    }
-
-    @Override
-    public String putConnectTargetConfig(String connectorName, TargetKeyValue configs) throws Exception {
-        TargetKeyValue exist = connectorKeyValueStore.get(connectorName);
-        if (null != exist) {
-            Long updateTimestamp = exist.getLong(RuntimerConfigDefine.UPDATE_TIMESTAMP);
-            if (null != updateTimestamp) {
-                configs.put(RuntimerConfigDefine.UPDATE_TIMESTAMP, updateTimestamp);
-            }
-        }
-        if (configs.equals(exist)) {
-            return "Connector with same config already exist.";
-        }
-
-        Long currentTimestamp = System.currentTimeMillis();
-        configs.put(RuntimerConfigDefine.UPDATE_TIMESTAMP, currentTimestamp);
-        for (String requireConfig : RuntimerConfigDefine.REQUEST_CONFIG) {
-            if (!configs.containsKey(requireConfig)) {
-                return "Request config key: " + requireConfig;
-            }
-        }
-
-        String connectorClass = configs.getString(RuntimerConfigDefine.CONNECTOR_CLASS);
-        ClassLoader classLoader = plugin.getPluginClassLoader(connectorClass);
-        Class clazz;
-        if (null != classLoader) {
-            clazz = Class.forName(connectorClass, true, classLoader);
-        } else {
-            clazz = Class.forName(connectorClass);
-        }
-        final Connector connector = (Connector) clazz.getDeclaredConstructor().newInstance();
-        connector.validate(configs);
-        connector.init(configs);
-        connectorKeyValueStore.put(connectorName, configs);
-        recomputeTaskConfigs(connectorName, connector, currentTimestamp, configs);
-        return "";
-    }
-
-    @Override
-    public void recomputeTaskConfigs(String connectorName, Connector connector, Long currentTimestamp, TargetKeyValue configs) {
-        int maxTask = configs.getInt(RuntimerConfigDefine.MAX_TASK, 1);
-        List<KeyValue> taskConfigs = connector.taskConfigs(maxTask);
-        List<TargetKeyValue> converterdConfigs = new ArrayList<>();
-        for (KeyValue keyValue : taskConfigs) {
-            TargetKeyValue newKeyValue = new TargetKeyValue();
-            for (String key : keyValue.keySet()) {
-                newKeyValue.put(key, keyValue.getString(key));
-            }
-            newKeyValue.put(RuntimerConfigDefine.TASK_CLASS, connector.taskClass().getName());
-            newKeyValue.put(RuntimerConfigDefine.UPDATE_TIMESTAMP, currentTimestamp);
-
-            newKeyValue.put(RuntimerConfigDefine.CONNECT_TOPICNAME, configs.getString(RuntimerConfigDefine.CONNECT_TOPICNAME));
-            newKeyValue.put(RuntimerConfigDefine.CONNECT_TOPICNAMES, configs.getString(RuntimerConfigDefine.CONNECT_TOPICNAMES));
-            Set<String> connectConfigKeySet = configs.keySet();
-            for (String connectConfigKey : connectConfigKeySet) {
-                if (connectConfigKey.startsWith(RuntimerConfigDefine.TRANSFORMS)) {
-                    newKeyValue.put(connectConfigKey, configs.getString(connectConfigKey));
-                }
-            }
-            converterdConfigs.add(newKeyValue);
-            connectTopicNames.add(configs.getString(RuntimerConfigDefine.CONNECT_TOPICNAME));
-        }
-        putTaskConfigs(connectorName, converterdConfigs);
-    }
-
-    @Override
-    public void removeConnectorConfig(String connectorName) {
-        TargetKeyValue config = connectorKeyValueStore.get(connectorName);
-        if(Objects.isNull(config)){
-            return;
-        }
-        config.put(RuntimerConfigDefine.UPDATE_TIMESTAMP, System.currentTimeMillis());
-        config.put(RuntimerConfigDefine.CONFIG_DELETED, 1);
-        List<TargetKeyValue> taskConfigList = taskKeyValueStore.get(connectorName);
-        taskConfigList.add(config);
-        connectorKeyValueStore.put(connectorName, config);
-        putTaskConfigs(connectorName, taskConfigList);
-    }
-
-    @Override
-    public Map<String, List<TargetKeyValue>> getTaskConfigs() {
-        Map<String, List<TargetKeyValue>> result = new HashMap<>();
-        Map<String, List<TargetKeyValue>> taskConfigs = taskKeyValueStore.getKVMap();
-        Map<String, TargetKeyValue> filteredConnector = getConnectorConfigs();
-        for (String connectorName : taskConfigs.keySet()) {
-            if (!filteredConnector.containsKey(connectorName)) {
-                continue;
-            }
-            result.put(connectorName, taskConfigs.get(connectorName));
-        }
-        return result;
-    }
-
-    @Override
-    public Set<PusherTargetEntity> getTargetInfo() {
-        Set<PusherTargetEntity> result = new HashSet<>();
-        Map<String, List<TargetKeyValue>> taskConfigs = taskKeyValueStore.getKVMap();
-        Map<String, TargetKeyValue> filteredConnector = getConnectorConfigs();
-        for (String connectorName : taskConfigs.keySet()) {
-            if (!filteredConnector.containsKey(connectorName)) {
-                continue;
-            }
-            PusherTargetEntity targetEntity = new PusherTargetEntity();
-            targetEntity.setConnectName(connectorName);
-            targetEntity.setTargetKeyValues(taskConfigs.get(connectorName));
-            result.add(targetEntity);
-        }
-        return result;
-    }
-
-    @Override
-    public List<String> getConnectTopics(){
-        if(CollectionUtils.isEmpty(connectTopicNames)){
-            return Lists.newArrayList();
-        }
-        return Lists.newArrayList(connectTopicNames);
-    }
-
-    @Override
-    public void persist() {
-        this.connectorKeyValueStore.persist();
-        this.taskKeyValueStore.persist();
-    }
-
-    @Override
-    public void registerListener(TargetConfigUpdateListener listener) {
-        this.targetConfigUpdateListeners.add(listener);
-    }
-
-    /**
-     * put target task key config for update
-     * @param connectorName
-     * @param configs
-     */
-    private void putTaskConfigs(String connectorName, List<TargetKeyValue> configs) {
-        List<TargetKeyValue> exist = taskKeyValueStore.get(connectorName);
-        if (null != exist && exist.size() > 0) {
-            taskKeyValueStore.remove(connectorName);
-        }
-        taskKeyValueStore.put(connectorName, configs);
-        PusherTargetEntity targetEntity = new PusherTargetEntity();
-        targetEntity.setConnectName(connectorName);
-        targetEntity.setTargetKeyValues(configs);
-        triggerListener(targetEntity);
-        persistStore();
-    }
-
-    private void persistStore() {
-
-    }
-
-    /**
-     * trigger new target task config for update
-     * @param pusherTargetEntity
-     */
-    private void triggerListener(PusherTargetEntity pusherTargetEntity) {
-        if (CollectionUtils.isEmpty(this.targetConfigUpdateListeners)) {
-            return;
-        }
-
-        for (TargetConfigUpdateListener listener : this.targetConfigUpdateListeners) {
-            listener.onConfigUpdate(pusherTargetEntity);
-        }
-    }
-
-}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageService.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigObserver.java
similarity index 50%
rename from adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageService.java
rename to adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigObserver.java
index 88a2001..c6c7670 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageService.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigObserver.java
@@ -17,82 +17,31 @@
 
 package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
 
-import io.openmessaging.connector.api.component.connector.Connector;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
-
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerListener;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
 
 /**
  * manage the pusher connector/task config info
  */
-public interface PusherConfigManageService {
-
-    /**
-     * Get all connector configs
-     *
-     * @return
-     */
-    Map<String, TargetKeyValue> getConnectorConfigs();
-
-    /**
-     * Put the configs.
-     *
-     * @param connectorName
-     * @param configs
-     * @return
-     * @throws Exception
-     */
-    String putConnectTargetConfig(String connectorName, TargetKeyValue configs) throws Exception;
-
-    /**
-     * Remove the connector
-     *
-     * @param connectorName
-     */
-    void removeConnectorConfig(String connectorName);
-
-    void recomputeTaskConfigs(String connectorName, Connector connector, Long currentTimestamp, TargetKeyValue configs);
+public interface TargetRunnerConfigObserver {
 
     /**
-     * Get all Task configs.
-     *
+     * Get the target runner config of runtimer.
      * @return
      */
-    Map<String, List<TargetKeyValue>> getTaskConfigs();
-
-    /**
-     * get target info
-     * @return
-     */
-    Set<PusherTargetEntity> getTargetInfo();
-
-    /**
-     * Get all topics
-     * @return
-     */
-    List<String> getConnectTopics();
-
-    /**
-     * Persist all the configs in a store.
-     */
-    void persist();
+    Set<TargetRunnerConfig> getTargetRunnerConfig();
 
     /**
      * Register a listener to listen all config update operations.
      *
      * @param listener
      */
-    void registerListener(TargetConfigUpdateListener listener);
-
-    interface TargetConfigUpdateListener {
-
-        /**
-         * Invoke while connector config changed.
-         */
-        void onConfigUpdate(PusherTargetEntity targetEntity);
-    }
+    void registerListener(TargetRunnerListener listener);
 
+    @Deprecated
+    Map<String, List<TargetKeyValue>> getTaskConfigs();
 }
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnControllerObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnControllerObserver.java
new file mode 100644
index 0000000..f51ac50
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnControllerObserver.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
+
+import com.google.common.collect.Sets;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+
+@Slf4j
+public class TargetRunnerConfigOnControllerObserver extends AbstractTargetRunnerConfigObserver {
+
+    @Override
+    public Set<TargetRunnerConfig> getLatestTargetRunnerConfig() {
+        return Sets.newHashSet();
+    }
+
+    public void add(TargetRunnerConfig targetRunnerConfig) {
+        super.onAddTargetRunner(targetRunnerConfig);
+    }
+
+    public void update(TargetRunnerConfig targetRunnerConfig) {
+        super.onUpdateTargetRunner(targetRunnerConfig);
+    }
+
+    public void delete(TargetRunnerConfig targetRunnerConfig) {
+        super.onDeleteTargetRunner(targetRunnerConfig);
+    }
+
+}
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnDBObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnDBObserver.java
new file mode 100644
index 0000000..3d847a1
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnDBObserver.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
+
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
+
+@Slf4j
+public class TargetRunnerConfigOnDBObserver extends AbstractTargetRunnerConfigObserver {
+
+    private static ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(
+        ThreadUtils.newThreadFactory("TargetRunnerConfigOnDBObserver", false));
+
+    public TargetRunnerConfigOnDBObserver(Plugin plugin) {
+        this.addListen(this);
+    }
+
+    @Override
+    public Set<TargetRunnerConfig> getLatestTargetRunnerConfig() {
+        return null;
+    }
+
+    public void addListen(
+        TargetRunnerConfigOnDBObserver pusherConfigOnFileService) {
+        service.scheduleAtFixedRate(() -> {
+            try {
+                Set<TargetRunnerConfig> latest = this.getLatestTargetRunnerConfig();
+                Set<TargetRunnerConfig> last = super.getTargetRunnerConfig();
+                TargetRunnerConfig changed = null;
+                super.onAddTargetRunner(changed);
+                super.onUpdateTargetRunner(changed);
+                super.onDeleteTargetRunner(changed);
+            } catch (Throwable e) {
+                log.error("Watch failed.", e);
+            }
+        }, 0, 3, TimeUnit.SECONDS);
+    }
+
+}
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
new file mode 100644
index 0000000..27269b8
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
+
+import java.nio.file.FileSystems;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardWatchEventKinds;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
+import org.springframework.beans.factory.annotation.Value;
+
+@Slf4j
+public class TargetRunnerConfigOnFileObserver extends AbstractTargetRunnerConfigObserver {
+
+    @Value("${runtimer.storePathRootDir:}")
+    private String storeRootPath;
+
+    @Value("${runtimer.store.targetRunner.config:targetRunner-config}")
+    private String fileName;
+
+    private static ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(
+        ThreadUtils.newThreadFactory("TargetRunnerConfigOnFileObserver", false));
+
+    public TargetRunnerConfigOnFileObserver(Plugin plugin) {
+        this.addListen(storeRootPath, fileName, this);
+    }
+
+    @Override
+    public Set<TargetRunnerConfig> getLatestTargetRunnerConfig() {
+        return null;
+    }
+
+    public void addListen(String pathStr, String fileName,
+        TargetRunnerConfigOnFileObserver pusherConfigOnFileService) {
+        log.info("Watching task file changing:{}", pathStr + fileName);
+        service.scheduleAtFixedRate(() -> {
+            try {
+                WatchService watchService = FileSystems.getDefault()
+                    .newWatchService();
+                Path path = Paths.get(pathStr);
+                path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE,
+                    StandardWatchEventKinds.ENTRY_MODIFY);
+                WatchKey watchKey;
+                while (true) {
+                    watchKey = watchService.take();
+                    if (watchKey != null && !watchKey.pollEvents()
+                        .isEmpty()) {
+                        log.info("Watched the file changed events.");
+                        pusherConfigOnFileService.diff();
+                    }
+                    watchKey.reset();
+                }
+            } catch (Throwable e) {
+                log.error("Watch file failed.", e);
+            }
+        }, 0, 3, TimeUnit.SECONDS);
+    }
+
+    public void diff() {
+        Set<TargetRunnerConfig> latest = this.getLatestTargetRunnerConfig();
+        Set<TargetRunnerConfig> last = super.getTargetRunnerConfig();
+        TargetRunnerConfig changed = null;
+        super.onAddTargetRunner(changed);
+        super.onUpdateTargetRunner(changed);
+        super.onDeleteTargetRunner(changed);
+    }
+
+}
\ No newline at end of file