You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by sh...@apache.org on 2023/03/15 09:52:06 UTC

[rocketmq-eventbridge] 01/01: feat:add TargetRunnerConfigObserver

This is an automated email from the ASF dual-hosted git repository.

shenlin pushed a commit to branch shenlin/runtimer
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git

commit 2a24698066e51ca6bdc74625a09228339d162ea1
Author: 2011shenlin <20...@gmail.com>
AuthorDate: Wed Mar 15 17:51:40 2023 +0800

    feat:add TargetRunnerConfigObserver
---
 .../RocketMQConnectTargetRunnerAPIImpl.java        |  40 +--
 .../eventbridge/adapter/runtimer/Runtimer.java     |  24 +-
 .../adapter/runtimer/boot/EventBusListener.java    |  80 +++---
 .../adapter/runtimer/boot/EventRuleTransfer.java   |  82 ++++---
 .../adapter/runtimer/boot/EventTargetPusher.java   |  73 +++---
 .../runtimer/boot/listener/ListenerFactory.java    |   6 +-
 .../boot/listener/TargetRunnerListener.java        |  44 ++++
 ...erTargetEntity.java => TargetRunnerConfig.java} |   4 +-
 .../AbstractTargetRunnerConfigObserver.java        |  95 ++++++++
 .../service/PusherConfigManageServiceImpl.java     | 271 ---------------------
 ...ervice.java => TargetRunnerConfigObserver.java} |  69 +-----
 .../TargetRunnerConfigOnControllerObserver.java    |  48 ++++
 .../service/TargetRunnerConfigOnDBObserver.java    |  60 +++++
 .../service/TargetRunnerConfigOnFileObserver.java  |  92 +++++++
 14 files changed, 517 insertions(+), 471 deletions(-)

diff --git a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/RocketMQConnectTargetRunnerAPIImpl.java b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/RocketMQConnectTargetRunnerAPIImpl.java
index fda982c..8971028 100644
--- a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/RocketMQConnectTargetRunnerAPIImpl.java
+++ b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/RocketMQConnectTargetRunnerAPIImpl.java
@@ -20,11 +20,8 @@ package org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect;
 import com.alibaba.fastjson.JSON;
 import com.google.common.collect.Lists;
 import com.google.gson.Gson;
-
 import java.util.ArrayList;
 import java.util.Map;
-import java.util.Objects;
-
 import lombok.SneakyThrows;
 import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.context.RocketMQConnectSourceRunnerContext;
 import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.context.RocketMQConnectTargetRunnerContext;
@@ -32,7 +29,6 @@ import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.dto.ActionStatus
 import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.dto.CreateSinkConnectorRequest;
 import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.dto.TransformRequest;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService;
 import org.apache.rocketmq.eventbridge.domain.common.enums.EventTargetStatusEnum;
 import org.apache.rocketmq.eventbridge.domain.model.Component;
 import org.apache.rocketmq.eventbridge.domain.model.run.RunOptions;
@@ -45,13 +41,10 @@ public class RocketMQConnectTargetRunnerAPIImpl extends RocketMQConverter implem
 
     private final RocketMQConnectClient rocketMQConnectClient;
 
-    private PusherConfigManageService pusherConfigManageService;
-
     public RocketMQConnectTargetRunnerAPIImpl(EventDataRepository eventDataRepository,
-        RocketMQConnectClient rocketMQConnectClient, PusherConfigManageService pusherConfigManageService) {
+        RocketMQConnectClient rocketMQConnectClient) {
         super(eventDataRepository);
         this.rocketMQConnectClient = rocketMQConnectClient;
-        this.pusherConfigManageService = pusherConfigManageService;
     }
 
     @SneakyThrows
@@ -64,14 +57,9 @@ public class RocketMQConnectTargetRunnerAPIImpl extends RocketMQConverter implem
         TransformRequest filterTransform = this.buildEventBridgeFilterTransform(filterPattern);
         TransformRequest eventBridgeTransform = this.buildEventBridgeTransform(targetTransform);
         TargetKeyValue targetKeyValue = initSinkTaskConfig(name, topicName, sinkConnectorClass,
-                sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
-        if(Objects.nonNull(pusherConfigManageService)){
-            pusherConfigManageService.putConnectTargetConfig(name, targetKeyValue);
-        }else {
-            // todo delete
-            rocketMQConnectClient.createSinkConnector(name, topicName, sinkConnectorClass,
-                    sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
-        }
+            sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
+        rocketMQConnectClient.createSinkConnector(name, topicName, sinkConnectorClass,
+            sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
         RocketMQConnectTargetRunnerContext context = new RocketMQConnectTargetRunnerContext(name, JSON.toJSONString(targetKeyValue));
         return new Gson().toJson(context);
     }
@@ -87,17 +75,11 @@ public class RocketMQConnectTargetRunnerAPIImpl extends RocketMQConverter implem
         TransformRequest eventBridgeTransform = this.buildEventBridgeTransform(targetTransform);
         //create
         TargetKeyValue targetKeyValue = initSinkTaskConfig(name, topicName, sinkConnectorClass,
-                sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
-        if(Objects.nonNull(pusherConfigManageService)){
-            pusherConfigManageService.putConnectTargetConfig(name, targetKeyValue);
-        }else {
-            // todo delete
-            //stop
-            this.delete(runContext);
-
-            rocketMQConnectClient.createSinkConnector(name, topicName, sinkConnectorClass,
-                    sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
-        }
+            sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
+        //stop
+        this.delete(runContext);
+        rocketMQConnectClient.createSinkConnector(name, topicName, sinkConnectorClass,
+            sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
         RocketMQConnectTargetRunnerContext context = new RocketMQConnectTargetRunnerContext(name, JSON.toJSONString(targetKeyValue));
         return new Gson().toJson(context);
     }
@@ -133,6 +115,7 @@ public class RocketMQConnectTargetRunnerAPIImpl extends RocketMQConverter implem
 
     /**
      * init sink task config
+     *
      * @param name
      * @param topicName
      * @param sinkClass
@@ -140,7 +123,8 @@ public class RocketMQConnectTargetRunnerAPIImpl extends RocketMQConverter implem
      * @param transforms
      * @return
      */
-    private TargetKeyValue initSinkTaskConfig(String name, String topicName, String sinkClass, Map<String, Object> sinkConfig, ArrayList<TransformRequest> transforms) {
+    private TargetKeyValue initSinkTaskConfig(String name, String topicName, String sinkClass,
+        Map<String, Object> sinkConfig, ArrayList<TransformRequest> transforms) {
         CreateSinkConnectorRequest request = new CreateSinkConnectorRequest();
         request.setName(name);
         request.setTopicName(topicName);
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
index 896885d..44745d2 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
@@ -25,7 +25,7 @@ import org.apache.rocketmq.eventbridge.adapter.runtimer.common.RuntimerState;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigObserver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
@@ -55,7 +55,7 @@ public class Runtimer extends ServiceThread{
 
     private ListenerFactory listenerFactory;
 
-    private PusherConfigManageService pusherConfigManageService;
+    private TargetRunnerConfigObserver targetRunnerConfigObserver;
 
     private Map<String, List<TargetKeyValue>> taskConfigs = new HashMap<>();
 
@@ -67,21 +67,21 @@ public class Runtimer extends ServiceThread{
 
     private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor((Runnable r) -> new Thread(r, "RuntimerScheduledThread"));
 
-    public Runtimer(Plugin plugin, ListenerFactory listenerFactory, PusherConfigManageService configManageService) {
+    public Runtimer(Plugin plugin, ListenerFactory listenerFactory, TargetRunnerConfigObserver configManageService) {
         this.plugin = plugin;
         this.listenerFactory = listenerFactory;
-        this.pusherConfigManageService = configManageService;
+        this.targetRunnerConfigObserver = configManageService;
     }
 
     @PostConstruct
     public void initAndStart() {
         logger.info("init runtimer task config");
-        this.taskConfigs = pusherConfigManageService.getTaskConfigs();
-        listener = new EventBusListener(listenerFactory, pusherConfigManageService);
+        this.taskConfigs = targetRunnerConfigObserver.getTaskConfigs();
+        listener = new EventBusListener(listenerFactory, targetRunnerConfigObserver);
         listener.initOrUpdateListenConsumer(taskConfigs);
-        transfer = new EventRuleTransfer(plugin, listenerFactory, pusherConfigManageService);
+        transfer = new EventRuleTransfer(plugin, listenerFactory, targetRunnerConfigObserver);
         transfer.initOrUpdateTaskTransform(taskConfigs);
-        pusher = new EventTargetPusher(plugin, listenerFactory, pusherConfigManageService);
+        pusher = new EventTargetPusher(plugin, listenerFactory, targetRunnerConfigObserver);
         pusher.initOrUpdatePusherTask(taskConfigs);
         startRuntimer();
     }
@@ -105,13 +105,5 @@ public class Runtimer extends ServiceThread{
 
         pusher.start();
 
-        scheduledExecutorService.scheduleAtFixedRate(() -> {
-            try {
-                this.pusherConfigManageService.persist();
-            } catch (Exception e) {
-                logger.error("schedule persist config error.", e);
-            }
-        }, 500, 500, TimeUnit.MILLISECONDS);
-
     }
 }
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
index 0c40235..8f3c707 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
@@ -25,31 +25,43 @@ import io.openmessaging.connector.api.data.RecordOffset;
 import io.openmessaging.connector.api.data.RecordPartition;
 import io.openmessaging.connector.api.data.Schema;
 import io.openmessaging.internal.DefaultKeyValue;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerListener;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.QueueState;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigObserver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.CollectionUtils;
 
-import java.nio.charset.StandardCharsets;
-import java.util.*;
-import java.util.concurrent.*;
-
 /**
  * listen the event and offer to queue
+ *
  * @author artisan
  */
-public class EventBusListener extends ServiceThread {
+public class EventBusListener extends ServiceThread implements TargetRunnerListener {
 
     private Logger logger = LoggerFactory.getLogger(EventBusListener.class);
 
@@ -63,32 +75,33 @@ public class EventBusListener extends ServiceThread {
 
     private ListenerFactory listenerFactory;
 
-    private PusherConfigManageService pusherConfigManageService;
+    private TargetRunnerConfigObserver targetRunnerConfigObserver;
 
-    private ExecutorService executorService = new ThreadPoolExecutor(20,60, 1000,TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(100));
+    private ExecutorService executorService = new ThreadPoolExecutor(20, 60, 1000, TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(100));
 
     private BlockingQueue<MessageExt> eventMessage = new LinkedBlockingQueue(50000);
 
-    public EventBusListener(ListenerFactory listenerFactory, PusherConfigManageService pusherConfigManageService){
+    public EventBusListener(ListenerFactory listenerFactory, TargetRunnerConfigObserver targetRunnerConfigObserver) {
         this.messageQueuesOffsetMap = new ConcurrentHashMap<>(256);
         this.messageQueuesStateMap = new ConcurrentHashMap<>(256);
         this.listenerFactory = listenerFactory;
-        this.pusherConfigManageService = pusherConfigManageService;
-        this.pusherConfigManageService.registerListener(new ConsumerUpdateListenerImpl());
+        this.targetRunnerConfigObserver = targetRunnerConfigObserver;
+        this.targetRunnerConfigObserver.registerListener(this);
     }
 
     /**
      * init listen consumer
+     *
      * @param taskConfig
      */
-    public void initOrUpdateListenConsumer(Map<String, List<TargetKeyValue>> taskConfig){
-        if(MapUtils.isEmpty(taskConfig)){
+    public void initOrUpdateListenConsumer(Map<String, List<TargetKeyValue>> taskConfig) {
+        if (MapUtils.isEmpty(taskConfig)) {
             logger.warn("initListenConsumer by taskConfig param is empty");
             return;
         }
         List<TargetKeyValue> targetKeyValues = initTaskKeyInfo(taskConfig);
         this.topics.addAll(listenerFactory.parseTopicListByList(targetKeyValues));
-        for (String topic : topics){
+        for (String topic : topics) {
             DefaultLitePullConsumer pullConsumer = listenerFactory.initDefaultMQPullConsumer(topic);
             listenConsumer.add(pullConsumer);
         }
@@ -97,12 +110,13 @@ public class EventBusListener extends ServiceThread {
 
     /**
      * init all task config info
+     *
      * @param taskConfig
      * @return
      */
     private List<TargetKeyValue> initTaskKeyInfo(Map<String, List<TargetKeyValue>> taskConfig) {
         Set<TargetKeyValue> targetKeyValues = new HashSet<>();
-        for(String connectName : taskConfig.keySet()){
+        for (String connectName : taskConfig.keySet()) {
             targetKeyValues.addAll(taskConfig.get(connectName));
         }
         return Lists.newArrayList(targetKeyValues);
@@ -110,13 +124,13 @@ public class EventBusListener extends ServiceThread {
 
     @Override
     public void run() {
-        while (!stopped){
-            if(CollectionUtils.isEmpty(listenConsumer)){
+        while (!stopped) {
+            if (CollectionUtils.isEmpty(listenConsumer)) {
                 logger.info("current listen consumer is empty");
                 this.waitForRunning(1000);
                 continue;
             }
-            for(DefaultLitePullConsumer pullConsumer : listenConsumer) {
+            for (DefaultLitePullConsumer pullConsumer : listenConsumer) {
                 executorService.submit(() -> {
                     try {
                         List<MessageExt> messageExts = pullConsumer.poll(3000);
@@ -144,6 +158,7 @@ public class EventBusListener extends ServiceThread {
 
     /**
      * MessageExt convert to connect record
+     *
      * @param messageExt
      * @return
      */
@@ -175,14 +190,21 @@ public class EventBusListener extends ServiceThread {
     /**
      * consumer update listener
      */
-    class ConsumerUpdateListenerImpl implements PusherConfigManageService.TargetConfigUpdateListener {
-
-        @Override
-        public void onConfigUpdate(PusherTargetEntity targetEntity) {
-            logger.info("consumer update by new target config changed, target info -{}", JSON.toJSONString(targetEntity));
-            Map<String, List<TargetKeyValue>> lastTargetMap = new HashMap<>();
-            lastTargetMap.put(targetEntity.getConnectName(), targetEntity.getTargetKeyValues());
-            initOrUpdateListenConsumer(lastTargetMap);
-        }
+    @Override
+    public void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+        logger.info("consumer update by new target config changed, target info -{}", JSON.toJSONString(targetRunnerConfig));
+        Map<String, List<TargetKeyValue>> lastTargetMap = new HashMap<>();
+        lastTargetMap.put(targetRunnerConfig.getConnectName(), targetRunnerConfig.getTargetKeyValues());
+        initOrUpdateListenConsumer(lastTargetMap);
     }
+
+    @Override
+    public void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+    }
+
+    @Override
+    public void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+
+    }
+
 }
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
index f14fdec..be73b2a 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
@@ -19,44 +19,54 @@ package org.apache.rocketmq.eventbridge.adapter.runtimer.boot;
 
 import com.alibaba.fastjson.JSON;
 import io.openmessaging.connector.api.data.ConnectRecord;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerListener;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.transfer.TransformEngine;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigObserver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
-import java.util.concurrent.*;
-
 /**
  * receive event and transfer the rule to pusher
  */
-public class EventRuleTransfer extends ServiceThread {
+public class EventRuleTransfer extends ServiceThread implements TargetRunnerListener {
 
     private static final Logger logger = LoggerFactory.getLogger(EventRuleTransfer.class);
 
     private ListenerFactory listenerFactory;
 
-    private PusherConfigManageService pusherConfigManageService;
+    private TargetRunnerConfigObserver targetRunnerConfigObserver;
 
     private Plugin plugin;
 
     Map<TargetKeyValue/*taskConfig*/, TransformEngine<ConnectRecord>/*taskTransform*/> taskTransformMap = new ConcurrentHashMap<>(20);
 
-    private ExecutorService executorService = new ThreadPoolExecutor(20,60, 1000,TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(100));
+    private ExecutorService executorService = new ThreadPoolExecutor(20, 60, 1000, TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(100));
 
-    public EventRuleTransfer(Plugin plugin, ListenerFactory listenerFactory, PusherConfigManageService pusherConfigManageService){
+    public EventRuleTransfer(Plugin plugin, ListenerFactory listenerFactory,
+        TargetRunnerConfigObserver targetRunnerConfigObserver) {
         this.plugin = plugin;
         this.listenerFactory = listenerFactory;
-        this.pusherConfigManageService = pusherConfigManageService;
-        this.pusherConfigManageService.registerListener(new TransformUpdateListenerImpl());
+        this.targetRunnerConfigObserver = targetRunnerConfigObserver;
+        this.targetRunnerConfigObserver.registerListener(this);
     }
 
-    public void initOrUpdateTaskTransform(Map<String, List<TargetKeyValue>> taskConfig){
+    public void initOrUpdateTaskTransform(Map<String, List<TargetKeyValue>> taskConfig) {
         this.taskTransformMap.putAll(initSinkTaskTransformInfo(taskConfig));
     }
 
@@ -67,9 +77,9 @@ public class EventRuleTransfer extends ServiceThread {
 
     @Override
     public void run() {
-        while (!stopped){
+        while (!stopped) {
             ConnectRecord eventRecord = listenerFactory.takeEventRecord();
-            if(Objects.isNull(eventRecord)){
+            if (Objects.isNull(eventRecord)) {
                 logger.info("listen eventRecord is empty, continue by curTime - {}", System.currentTimeMillis());
                 this.waitForRunning(1000);
                 continue;
@@ -77,16 +87,16 @@ public class EventRuleTransfer extends ServiceThread {
             executorService.submit(() -> {
                 // extension add sub
                 // rule - target
-                for (TargetKeyValue targetKeyValue : taskTransformMap.keySet()){
+                for (TargetKeyValue targetKeyValue : taskTransformMap.keySet()) {
                     // add threadPool for cup task
                     // attention coreSize
                     TransformEngine<ConnectRecord> transformEngine = taskTransformMap.get(targetKeyValue);
                     ConnectRecord transformRecord = transformEngine.doTransforms(eventRecord);
-                    if(Objects.isNull(transformRecord)){
+                    if (Objects.isNull(transformRecord)) {
                         continue;
                     }
                     // a bean for maintain
-                    Map<TargetKeyValue,ConnectRecord> targetMap = new HashMap<>();
+                    Map<TargetKeyValue, ConnectRecord> targetMap = new HashMap<>();
                     targetMap.put(targetKeyValue, transformRecord);
                     listenerFactory.offerTargetTaskQueue(targetMap);
 
@@ -101,20 +111,20 @@ public class EventRuleTransfer extends ServiceThread {
     }
 
     /**
-     * Init sink task transform map
-     * key: task config
-     * value: transformEngine
+     * Init sink task transform map key: task config value: transformEngine
+     *
      * @param taskConfig
      * @return
      */
-    private Map<TargetKeyValue, TransformEngine<ConnectRecord>> initSinkTaskTransformInfo(Map<String, List<TargetKeyValue>> taskConfig) {
+    private Map<TargetKeyValue, TransformEngine<ConnectRecord>> initSinkTaskTransformInfo(
+        Map<String, List<TargetKeyValue>> taskConfig) {
         Map<TargetKeyValue, TransformEngine<ConnectRecord>> curTaskTransformMap = new HashMap<>();
         Set<TargetKeyValue> allTaskKeySet = new HashSet<>();
-        for(String connectName : taskConfig.keySet()){
+        for (String connectName : taskConfig.keySet()) {
             List<TargetKeyValue> taskKeyList = taskConfig.get(connectName);
             allTaskKeySet.addAll(new HashSet<>(taskKeyList));
         }
-        for(TargetKeyValue keyValue : allTaskKeySet){
+        for (TargetKeyValue keyValue : allTaskKeySet) {
             TransformEngine<ConnectRecord> transformChain = new TransformEngine<>(keyValue, plugin);
             curTaskTransformMap.put(keyValue, transformChain);
         }
@@ -125,14 +135,20 @@ public class EventRuleTransfer extends ServiceThread {
     /**
      * transform update listener
      */
-    class TransformUpdateListenerImpl implements PusherConfigManageService.TargetConfigUpdateListener {
-
-        @Override
-        public void onConfigUpdate(PusherTargetEntity targetEntity) {
-            logger.info("transform update by new target config changed, target info -{}", JSON.toJSONString(targetEntity));
-            Map<String, List<TargetKeyValue>> lastTargetMap = new HashMap<>();
-            lastTargetMap.put(targetEntity.getConnectName(), targetEntity.getTargetKeyValues());
-            initOrUpdateTaskTransform(lastTargetMap);
-        }
+    @Override
+    public void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+        logger.info("transform update by new target config changed, target info -{}", JSON.toJSONString(targetRunnerConfig));
+        Map<String, List<TargetKeyValue>> lastTargetMap = new HashMap<>();
+        lastTargetMap.put(targetRunnerConfig.getConnectName(), targetRunnerConfig.getTargetKeyValues());
+        initOrUpdateTaskTransform(lastTargetMap);
+    }
+
+    @Override
+    public void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+    }
+
+    @Override
+    public void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+
     }
 }
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
index d324fe4..bd9ddb0 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
@@ -22,27 +22,32 @@ import com.google.common.collect.Lists;
 import io.netty.util.internal.ConcurrentSet;
 import io.openmessaging.connector.api.component.task.sink.SinkTask;
 import io.openmessaging.connector.api.data.ConnectRecord;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
 import org.apache.commons.collections.MapUtils;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerListener;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.pusher.PusherTaskContext;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.PluginClassLoader;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigObserver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
-import java.util.concurrent.CopyOnWriteArrayList;
-
 /**
  * event target push to sink task
+ *
  * @author artisan
  */
-public class EventTargetPusher extends ServiceThread {
+public class EventTargetPusher extends ServiceThread implements TargetRunnerListener {
 
     private static final Logger logger = LoggerFactory.getLogger(EventTargetPusher.class);
 
@@ -56,29 +61,31 @@ public class EventTargetPusher extends ServiceThread {
 
     private ListenerFactory listenerFactory;
 
-    private PusherConfigManageService pusherConfigManageService;
+    private TargetRunnerConfigObserver targetRunnerConfigObserver;
 
     private List<SinkTask> pusherTasks = new CopyOnWriteArrayList<>();
 
-    public EventTargetPusher(Plugin plugin, ListenerFactory listenerFactory, PusherConfigManageService pusherConfigManageService){
+    public EventTargetPusher(Plugin plugin, ListenerFactory listenerFactory,
+        TargetRunnerConfigObserver targetRunnerConfigObserver) {
         this.plugin = plugin;
         this.listenerFactory = listenerFactory;
-        this.pusherConfigManageService = pusherConfigManageService;
-        this.pusherConfigManageService.registerListener(new TargetUpdateListenerImpl());
+        this.targetRunnerConfigObserver = targetRunnerConfigObserver;
+        this.targetRunnerConfigObserver.registerListener(this);
     }
 
     /**
      * init running tasks
+     *
      * @param taskConfig
      */
-    public void initOrUpdatePusherTask(Map<String, List<TargetKeyValue>> taskConfig){
+    public void initOrUpdatePusherTask(Map<String, List<TargetKeyValue>> taskConfig) {
         Set<TargetKeyValue> taskProperty = new HashSet<>();
-        for(String connectName : taskConfig.keySet()){
+        for (String connectName : taskConfig.keySet()) {
             List<TargetKeyValue> targetKeyValues = taskConfig.get(connectName);
             taskProperty.addAll(new HashSet<>(targetKeyValues));
         }
-        for(TargetKeyValue targetKeyValue : taskProperty){
-            try{
+        for (TargetKeyValue targetKeyValue : taskProperty) {
+            try {
                 String taskClass = targetKeyValue.getString(RuntimerConfigDefine.TASK_CLASS);
                 ClassLoader loader = plugin.getPluginClassLoader(taskClass);
                 Class taskClazz;
@@ -98,7 +105,7 @@ public class EventTargetPusher extends ServiceThread {
                     Plugin.compareAndSwapLoaders(loader);
                 }
                 logger.info("init target task succeed, target key - {}", JSON.toJSONString(targetKeyValue));
-            }catch (Exception exception){
+            } catch (Exception exception) {
                 exception.printStackTrace();
             }
         }
@@ -106,9 +113,9 @@ public class EventTargetPusher extends ServiceThread {
 
     @Override
     public void run() {
-        while (!stopped){
+        while (!stopped) {
             Map<TargetKeyValue, ConnectRecord> taskPusher = listenerFactory.takeTargetMap();
-            if(MapUtils.isEmpty(taskPusher)){
+            if (MapUtils.isEmpty(taskPusher)) {
                 logger.info("current target pusher is empty");
                 this.waitForRunning(1000);
                 continue;
@@ -121,8 +128,8 @@ public class EventTargetPusher extends ServiceThread {
             // also add in ConnectRecord class system property
             String taskPushName = targetKeyValue.getString(RuntimerConfigDefine.TASK_CLASS);
             // add thread pool
-            for(SinkTask sinkTask : pusherTasks){
-                if(sinkTask.getClass().getName().equals(taskPushName)){
+            for (SinkTask sinkTask : pusherTasks) {
+                if (sinkTask.getClass().getName().equals(taskPushName)) {
                     sinkTask.put(Lists.newArrayList(taskPusher.get(targetKeyValue)));
                 }
             }
@@ -137,14 +144,22 @@ public class EventTargetPusher extends ServiceThread {
     /**
      * target update listener
      */
-    class TargetUpdateListenerImpl implements PusherConfigManageService.TargetConfigUpdateListener {
-
-        @Override
-        public void onConfigUpdate(PusherTargetEntity targetEntity) {
-            logger.info("transform update by new target config changed, target info -{}", JSON.toJSONString(targetEntity));
-            Map<String, List<TargetKeyValue>> lastTargetMap = new HashMap<>();
-            lastTargetMap.put(targetEntity.getConnectName(), targetEntity.getTargetKeyValues());
-            initOrUpdatePusherTask(lastTargetMap);
-        }
+
+    @Override
+    public void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+        logger.info("transform update by new target config changed, target info -{}", JSON.toJSONString(targetRunnerConfig));
+        Map<String, List<TargetKeyValue>> lastTargetMap = new HashMap<>();
+        lastTargetMap.put(targetRunnerConfig.getConnectName(), targetRunnerConfig.getTargetKeyValues());
+        initOrUpdatePusherTask(lastTargetMap);
     }
+
+    @Override
+    public void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+    }
+
+    @Override
+    public void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+
+    }
+
 }
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java
index 07f010e..44c29bb 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java
@@ -28,7 +28,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.LoggerName;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
@@ -52,7 +52,7 @@ public class ListenerFactory {
 
     public static final String QUEUE_OFFSET = "queueOffset";
 
-    private BlockingQueue<PusherTargetEntity> pusherTargetQueue = new LinkedBlockingQueue<>(1000);
+    private BlockingQueue<TargetRunnerConfig> pusherTargetQueue = new LinkedBlockingQueue<>(1000);
 
     private BlockingQueue<MessageExt> eventMessage = new LinkedBlockingQueue(50000);
 
@@ -76,7 +76,7 @@ public class ListenerFactory {
         return consumer;
     }
 
-    public PusherTargetEntity takeTaskConfig(){
+    public TargetRunnerConfig takeTaskConfig(){
         if(pusherTargetQueue.isEmpty()){
             return null;
         }
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/TargetRunnerListener.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/TargetRunnerListener.java
new file mode 100644
index 0000000..f528b7b
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/TargetRunnerListener.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener;
+
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+
+public interface TargetRunnerListener {
+
+    /**
+     * Call when add new target runner to runtimer.
+     *
+     * @param targetRunnerConfig
+     */
+    void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig);
+
+    /**
+     * Call when the old target runner updated.
+     *
+     * @param targetRunnerConfig
+     */
+    void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig);
+
+    /**
+     * Call when the old target runner deleted from runtimer.
+     *
+     * @param targetRunnerConfig
+     */
+    void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig);
+
+}
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/PusherTargetEntity.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
similarity index 92%
rename from adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/PusherTargetEntity.java
rename to adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
index dc7ced4..52525cf 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/PusherTargetEntity.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
@@ -26,7 +26,7 @@ import java.util.List;
  * pusher target key config
  */
 @Data
-public class PusherTargetEntity implements Serializable {
+public class TargetRunnerConfig implements Serializable {
 
     private String connectName;
 
@@ -35,7 +35,7 @@ public class PusherTargetEntity implements Serializable {
     @Override
     public boolean equals(Object object){
         if (object != null && object.getClass() == this.getClass()) {
-            PusherTargetEntity targetEntity = (PusherTargetEntity) object;
+            TargetRunnerConfig targetEntity = (TargetRunnerConfig) object;
             return this.connectName.equals(targetEntity.getConnectName())
                     && this.targetKeyValues.size() == targetEntity.getTargetKeyValues().size()
                     && this.targetKeyValues.containsAll(targetEntity.getTargetKeyValues());
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/AbstractTargetRunnerConfigObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/AbstractTargetRunnerConfigObserver.java
new file mode 100644
index 0000000..619bc63
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/AbstractTargetRunnerConfigObserver.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
+
+import com.google.common.collect.Sets;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerListener;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
+
+@Service
+public abstract class AbstractTargetRunnerConfigObserver implements TargetRunnerConfigObserver {
+
+    private Set<TargetRunnerConfig> targetRunnerConfigs = Sets.newHashSet();
+
+    /**
+     * All listeners to trigger while config change.
+     */
+    private Set<TargetRunnerListener> targetRunnerConfigListeners = new HashSet<>();
+
+    public Set<TargetRunnerConfig> getTargetRunnerConfig() {
+        return targetRunnerConfigs;
+    }
+
+    public abstract Set<TargetRunnerConfig> getLatestTargetRunnerConfig();
+
+    @Override
+    public void registerListener(TargetRunnerListener listener) {
+        this.targetRunnerConfigListeners.add(listener);
+    }
+
+    void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+        this.targetRunnerConfigs.add(targetRunnerConfig);
+        if (CollectionUtils.isEmpty(this.targetRunnerConfigListeners)) {
+            return;
+        }
+        for (TargetRunnerListener listener : this.targetRunnerConfigListeners) {
+            listener.onAddTargetRunner(targetRunnerConfig);
+        }
+    }
+
+    /**
+     * Call when the old target runner updated.
+     *
+     * @param targetRunnerConfig
+     */
+    void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+        this.targetRunnerConfigs.add(targetRunnerConfig);
+        if (CollectionUtils.isEmpty(this.targetRunnerConfigListeners)) {
+            return;
+        }
+        for (TargetRunnerListener listener : this.targetRunnerConfigListeners) {
+            listener.onUpdateTargetRunner(targetRunnerConfig);
+        }
+    }
+
+    /**
+     * Call when the old target runner deleted from runtimer.
+     *
+     * @param targetRunnerConfig
+     */
+    void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+        this.targetRunnerConfigs.remove(targetRunnerConfig);
+        if (CollectionUtils.isEmpty(this.targetRunnerConfigListeners)) {
+            return;
+        }
+        for (TargetRunnerListener listener : this.targetRunnerConfigListeners) {
+            listener.onDeleteTargetRunner(targetRunnerConfig);
+        }
+    }
+
+    public Map<String, List<TargetKeyValue>> getTaskConfigs() {
+        return null;
+    }
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageServiceImpl.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageServiceImpl.java
deleted file mode 100644
index 58a66fd..0000000
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageServiceImpl.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
-
-import com.google.common.collect.Lists;
-import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.component.connector.Connector;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.FilePathConfigUtil;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.store.FileBaseKeyValueStore;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.store.KeyValueStore;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.converter.JsonConverter;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.converter.ListConverter;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Service;
-import org.springframework.util.CollectionUtils;
-
-import javax.annotation.PostConstruct;
-import java.util.*;
-import java.util.concurrent.CopyOnWriteArraySet;
-
-@Service
-public class PusherConfigManageServiceImpl implements PusherConfigManageService {
-
-    /**
-     * plugin for recognize class loader
-     */
-    private Plugin plugin;
-
-    /**
-     * Current connector configs in the store.
-     */
-    private KeyValueStore<String, TargetKeyValue> connectorKeyValueStore;
-
-    /**
-     * Current task configs in the store.
-     */
-    private KeyValueStore<String, List<TargetKeyValue>> taskKeyValueStore;
-
-    /**
-     * All listeners to trigger while config change.
-     */
-    private Set<TargetConfigUpdateListener> targetConfigUpdateListeners;
-
-    private Set<String> connectTopicNames;
-
-    @Value("${runtimer.storePathRootDir:}")
-    private String storeRootPath;
-
-    public PusherConfigManageServiceImpl(Plugin plugin){
-        this.plugin = plugin;
-        this.connectTopicNames = new CopyOnWriteArraySet<>();
-        this.targetConfigUpdateListeners = new HashSet<>();
-    }
-
-    @PostConstruct
-    public void initStoreKeyValue(){
-        this.connectorKeyValueStore = new FileBaseKeyValueStore<>(
-                FilePathConfigUtil.getConnectorConfigPath(this.storeRootPath),
-                new JsonConverter(),
-                new JsonConverter(TargetKeyValue.class));
-        this.taskKeyValueStore = new FileBaseKeyValueStore<>(
-                FilePathConfigUtil.getTaskConfigPath(this.storeRootPath),
-                new JsonConverter(),
-                new ListConverter(TargetKeyValue.class));
-        this.connectorKeyValueStore.load();
-        this.taskKeyValueStore.load();
-    }
-
-    /**
-     * get all connector configs enabled
-     *
-     * @return
-     */
-    @Override
-    public Map<String, TargetKeyValue> getConnectorConfigs() {
-        Map<String, TargetKeyValue> result = new HashMap<>();
-        Map<String, TargetKeyValue> connectorConfigs = connectorKeyValueStore.getKVMap();
-        for (String connectorName : connectorConfigs.keySet()) {
-            TargetKeyValue config = connectorConfigs.get(connectorName);
-            if (0 != config.getInt(RuntimerConfigDefine.CONFIG_DELETED)) {
-                continue;
-            }
-            result.put(connectorName, config);
-        }
-        return result;
-    }
-
-    @Override
-    public String putConnectTargetConfig(String connectorName, TargetKeyValue configs) throws Exception {
-        TargetKeyValue exist = connectorKeyValueStore.get(connectorName);
-        if (null != exist) {
-            Long updateTimestamp = exist.getLong(RuntimerConfigDefine.UPDATE_TIMESTAMP);
-            if (null != updateTimestamp) {
-                configs.put(RuntimerConfigDefine.UPDATE_TIMESTAMP, updateTimestamp);
-            }
-        }
-        if (configs.equals(exist)) {
-            return "Connector with same config already exist.";
-        }
-
-        Long currentTimestamp = System.currentTimeMillis();
-        configs.put(RuntimerConfigDefine.UPDATE_TIMESTAMP, currentTimestamp);
-        for (String requireConfig : RuntimerConfigDefine.REQUEST_CONFIG) {
-            if (!configs.containsKey(requireConfig)) {
-                return "Request config key: " + requireConfig;
-            }
-        }
-
-        String connectorClass = configs.getString(RuntimerConfigDefine.CONNECTOR_CLASS);
-        ClassLoader classLoader = plugin.getPluginClassLoader(connectorClass);
-        Class clazz;
-        if (null != classLoader) {
-            clazz = Class.forName(connectorClass, true, classLoader);
-        } else {
-            clazz = Class.forName(connectorClass);
-        }
-        final Connector connector = (Connector) clazz.getDeclaredConstructor().newInstance();
-        connector.validate(configs);
-        connector.init(configs);
-        connectorKeyValueStore.put(connectorName, configs);
-        recomputeTaskConfigs(connectorName, connector, currentTimestamp, configs);
-        return "";
-    }
-
-    @Override
-    public void recomputeTaskConfigs(String connectorName, Connector connector, Long currentTimestamp, TargetKeyValue configs) {
-        int maxTask = configs.getInt(RuntimerConfigDefine.MAX_TASK, 1);
-        List<KeyValue> taskConfigs = connector.taskConfigs(maxTask);
-        List<TargetKeyValue> converterdConfigs = new ArrayList<>();
-        for (KeyValue keyValue : taskConfigs) {
-            TargetKeyValue newKeyValue = new TargetKeyValue();
-            for (String key : keyValue.keySet()) {
-                newKeyValue.put(key, keyValue.getString(key));
-            }
-            newKeyValue.put(RuntimerConfigDefine.TASK_CLASS, connector.taskClass().getName());
-            newKeyValue.put(RuntimerConfigDefine.UPDATE_TIMESTAMP, currentTimestamp);
-
-            newKeyValue.put(RuntimerConfigDefine.CONNECT_TOPICNAME, configs.getString(RuntimerConfigDefine.CONNECT_TOPICNAME));
-            newKeyValue.put(RuntimerConfigDefine.CONNECT_TOPICNAMES, configs.getString(RuntimerConfigDefine.CONNECT_TOPICNAMES));
-            Set<String> connectConfigKeySet = configs.keySet();
-            for (String connectConfigKey : connectConfigKeySet) {
-                if (connectConfigKey.startsWith(RuntimerConfigDefine.TRANSFORMS)) {
-                    newKeyValue.put(connectConfigKey, configs.getString(connectConfigKey));
-                }
-            }
-            converterdConfigs.add(newKeyValue);
-            connectTopicNames.add(configs.getString(RuntimerConfigDefine.CONNECT_TOPICNAME));
-        }
-        putTaskConfigs(connectorName, converterdConfigs);
-    }
-
-    @Override
-    public void removeConnectorConfig(String connectorName) {
-        TargetKeyValue config = connectorKeyValueStore.get(connectorName);
-        if(Objects.isNull(config)){
-            return;
-        }
-        config.put(RuntimerConfigDefine.UPDATE_TIMESTAMP, System.currentTimeMillis());
-        config.put(RuntimerConfigDefine.CONFIG_DELETED, 1);
-        List<TargetKeyValue> taskConfigList = taskKeyValueStore.get(connectorName);
-        taskConfigList.add(config);
-        connectorKeyValueStore.put(connectorName, config);
-        putTaskConfigs(connectorName, taskConfigList);
-    }
-
-    @Override
-    public Map<String, List<TargetKeyValue>> getTaskConfigs() {
-        Map<String, List<TargetKeyValue>> result = new HashMap<>();
-        Map<String, List<TargetKeyValue>> taskConfigs = taskKeyValueStore.getKVMap();
-        Map<String, TargetKeyValue> filteredConnector = getConnectorConfigs();
-        for (String connectorName : taskConfigs.keySet()) {
-            if (!filteredConnector.containsKey(connectorName)) {
-                continue;
-            }
-            result.put(connectorName, taskConfigs.get(connectorName));
-        }
-        return result;
-    }
-
-    @Override
-    public Set<PusherTargetEntity> getTargetInfo() {
-        Set<PusherTargetEntity> result = new HashSet<>();
-        Map<String, List<TargetKeyValue>> taskConfigs = taskKeyValueStore.getKVMap();
-        Map<String, TargetKeyValue> filteredConnector = getConnectorConfigs();
-        for (String connectorName : taskConfigs.keySet()) {
-            if (!filteredConnector.containsKey(connectorName)) {
-                continue;
-            }
-            PusherTargetEntity targetEntity = new PusherTargetEntity();
-            targetEntity.setConnectName(connectorName);
-            targetEntity.setTargetKeyValues(taskConfigs.get(connectorName));
-            result.add(targetEntity);
-        }
-        return result;
-    }
-
-    @Override
-    public List<String> getConnectTopics(){
-        if(CollectionUtils.isEmpty(connectTopicNames)){
-            return Lists.newArrayList();
-        }
-        return Lists.newArrayList(connectTopicNames);
-    }
-
-    @Override
-    public void persist() {
-        this.connectorKeyValueStore.persist();
-        this.taskKeyValueStore.persist();
-    }
-
-    @Override
-    public void registerListener(TargetConfigUpdateListener listener) {
-        this.targetConfigUpdateListeners.add(listener);
-    }
-
-    /**
-     * put target task key config for update
-     * @param connectorName
-     * @param configs
-     */
-    private void putTaskConfigs(String connectorName, List<TargetKeyValue> configs) {
-        List<TargetKeyValue> exist = taskKeyValueStore.get(connectorName);
-        if (null != exist && exist.size() > 0) {
-            taskKeyValueStore.remove(connectorName);
-        }
-        taskKeyValueStore.put(connectorName, configs);
-        PusherTargetEntity targetEntity = new PusherTargetEntity();
-        targetEntity.setConnectName(connectorName);
-        targetEntity.setTargetKeyValues(configs);
-        triggerListener(targetEntity);
-        persistStore();
-    }
-
-    private void persistStore() {
-
-    }
-
-    /**
-     * trigger new target task config for update
-     * @param pusherTargetEntity
-     */
-    private void triggerListener(PusherTargetEntity pusherTargetEntity) {
-        if (CollectionUtils.isEmpty(this.targetConfigUpdateListeners)) {
-            return;
-        }
-
-        for (TargetConfigUpdateListener listener : this.targetConfigUpdateListeners) {
-            listener.onConfigUpdate(pusherTargetEntity);
-        }
-    }
-
-}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageService.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigObserver.java
similarity index 50%
rename from adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageService.java
rename to adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigObserver.java
index 88a2001..c6c7670 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageService.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigObserver.java
@@ -17,82 +17,31 @@
 
 package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
 
-import io.openmessaging.connector.api.component.connector.Connector;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
-
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerListener;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
 
 /**
  * manage the pusher connector/task config info
  */
-public interface PusherConfigManageService {
-
-    /**
-     * Get all connector configs
-     *
-     * @return
-     */
-    Map<String, TargetKeyValue> getConnectorConfigs();
-
-    /**
-     * Put the configs.
-     *
-     * @param connectorName
-     * @param configs
-     * @return
-     * @throws Exception
-     */
-    String putConnectTargetConfig(String connectorName, TargetKeyValue configs) throws Exception;
-
-    /**
-     * Remove the connector
-     *
-     * @param connectorName
-     */
-    void removeConnectorConfig(String connectorName);
-
-    void recomputeTaskConfigs(String connectorName, Connector connector, Long currentTimestamp, TargetKeyValue configs);
+public interface TargetRunnerConfigObserver {
 
     /**
-     * Get all Task configs.
-     *
+     * Get the target runner config of runtimer.
      * @return
      */
-    Map<String, List<TargetKeyValue>> getTaskConfigs();
-
-    /**
-     * get target info
-     * @return
-     */
-    Set<PusherTargetEntity> getTargetInfo();
-
-    /**
-     * Get all topics
-     * @return
-     */
-    List<String> getConnectTopics();
-
-    /**
-     * Persist all the configs in a store.
-     */
-    void persist();
+    Set<TargetRunnerConfig> getTargetRunnerConfig();
 
     /**
      * Register a listener to listen all config update operations.
      *
      * @param listener
      */
-    void registerListener(TargetConfigUpdateListener listener);
-
-    interface TargetConfigUpdateListener {
-
-        /**
-         * Invoke while connector config changed.
-         */
-        void onConfigUpdate(PusherTargetEntity targetEntity);
-    }
+    void registerListener(TargetRunnerListener listener);
 
+    @Deprecated
+    Map<String, List<TargetKeyValue>> getTaskConfigs();
 }
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnControllerObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnControllerObserver.java
new file mode 100644
index 0000000..f51ac50
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnControllerObserver.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
+
+import com.google.common.collect.Sets;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+
+@Slf4j
+public class TargetRunnerConfigOnControllerObserver extends AbstractTargetRunnerConfigObserver {
+
+    @Override
+    public Set<TargetRunnerConfig> getLatestTargetRunnerConfig() {
+        return Sets.newHashSet();
+    }
+
+    public void add(TargetRunnerConfig targetRunnerConfig) {
+        super.onAddTargetRunner(targetRunnerConfig);
+    }
+
+    public void update(TargetRunnerConfig targetRunnerConfig) {
+        super.onUpdateTargetRunner(targetRunnerConfig);
+    }
+
+    public void delete(TargetRunnerConfig targetRunnerConfig) {
+        super.onDeleteTargetRunner(targetRunnerConfig);
+    }
+
+}
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnDBObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnDBObserver.java
new file mode 100644
index 0000000..3d847a1
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnDBObserver.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
+
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
+
+@Slf4j
+public class TargetRunnerConfigOnDBObserver extends AbstractTargetRunnerConfigObserver {
+
+    private static ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(
+        ThreadUtils.newThreadFactory("TargetRunnerConfigOnDBObserver", false));
+
+    public TargetRunnerConfigOnDBObserver(Plugin plugin) {
+        this.addListen(this);
+    }
+
+    @Override
+    public Set<TargetRunnerConfig> getLatestTargetRunnerConfig() {
+        return null;
+    }
+
+    public void addListen(
+        TargetRunnerConfigOnDBObserver pusherConfigOnFileService) {
+        service.scheduleAtFixedRate(() -> {
+            try {
+                Set<TargetRunnerConfig> latest = this.getLatestTargetRunnerConfig();
+                Set<TargetRunnerConfig> last = super.getTargetRunnerConfig();
+                TargetRunnerConfig changed = null;
+                super.onAddTargetRunner(changed);
+                super.onUpdateTargetRunner(changed);
+                super.onDeleteTargetRunner(changed);
+            } catch (Throwable e) {
+                log.error("Watch failed.", e);
+            }
+        }, 0, 3, TimeUnit.SECONDS);
+    }
+
+}
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
new file mode 100644
index 0000000..27269b8
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
+
+import java.nio.file.FileSystems;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardWatchEventKinds;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
+import org.springframework.beans.factory.annotation.Value;
+
+@Slf4j
+public class TargetRunnerConfigOnFileObserver extends AbstractTargetRunnerConfigObserver {
+
+    @Value("${runtimer.storePathRootDir:}")
+    private String storeRootPath;
+
+    @Value("${runtimer.store.targetRunner.config:targetRunner-config}")
+    private String fileName;
+
+    private static ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(
+        ThreadUtils.newThreadFactory("TargetRunnerConfigOnFileObserver", false));
+
+    public TargetRunnerConfigOnFileObserver(Plugin plugin) {
+        this.addListen(storeRootPath, fileName, this);
+    }
+
+    @Override
+    public Set<TargetRunnerConfig> getLatestTargetRunnerConfig() {
+        return null;
+    }
+
+    public void addListen(String pathStr, String fileName,
+        TargetRunnerConfigOnFileObserver pusherConfigOnFileService) {
+        log.info("Watching task file changing:{}", pathStr + fileName);
+        service.scheduleAtFixedRate(() -> {
+            try {
+                WatchService watchService = FileSystems.getDefault()
+                    .newWatchService();
+                Path path = Paths.get(pathStr);
+                path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE,
+                    StandardWatchEventKinds.ENTRY_MODIFY);
+                WatchKey watchKey;
+                while (true) {
+                    watchKey = watchService.take();
+                    if (watchKey != null && !watchKey.pollEvents()
+                        .isEmpty()) {
+                        log.info("Watched the file changed events.");
+                        pusherConfigOnFileService.diff();
+                    }
+                    watchKey.reset();
+                }
+            } catch (Throwable e) {
+                log.error("Watch file failed.", e);
+            }
+        }, 0, 3, TimeUnit.SECONDS);
+    }
+
+    public void diff() {
+        Set<TargetRunnerConfig> latest = this.getLatestTargetRunnerConfig();
+        Set<TargetRunnerConfig> last = super.getTargetRunnerConfig();
+        TargetRunnerConfig changed = null;
+        super.onAddTargetRunner(changed);
+        super.onUpdateTargetRunner(changed);
+        super.onDeleteTargetRunner(changed);
+    }
+
+}
\ No newline at end of file