You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by sh...@apache.org on 2023/03/31 03:34:46 UTC

[rocketmq-eventbridge] branch runtimer updated: Optimize Event Circulator Context (#70)

This is an automated email from the ASF dual-hosted git repository.

shenlin pushed a commit to branch runtimer
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git


The following commit(s) were added to refs/heads/runtimer by this push:
     new 23529d4  Optimize Event Circulator Context (#70)
23529d4 is described below

commit 23529d43ac8baf91d281ec758e0b00e683cc58f0
Author: Artisan <Xi...@outlook.com>
AuthorDate: Fri Mar 31 11:34:41 2023 +0800

    Optimize Event Circulator Context (#70)
    
    Optimize Event Circulator Context
---
 adapter/runtimer/pom.xml                           |   6 -
 .../eventbridge/adapter/runtimer/Runtimer.java     |  51 ++--
 .../adapter/runtimer/boot/EventBusListener.java    |  32 ++-
 .../adapter/runtimer/boot/EventRuleTransfer.java   |  36 +--
 .../adapter/runtimer/boot/EventTargetPusher.java   | 110 ++------
 .../runtimer/boot/listener/CirculatorContext.java  | 201 ++++++++++++++
 .../runtimer/boot/listener/EventSubscriber.java    |  18 +-
 .../runtimer/boot/listener/ListenerFactory.java    | 294 ---------------------
 .../boot/listener/RocketMQEventSubscriber.java     | 264 +++++++++++++++++-
 .../boot/listener/TargetRunnerListener.java        |   1 +
 .../runtimer/common/entity/TargetKeyValue.java     |  32 ++-
 .../enums/RefreshTypeEnum.java}                    |  34 +--
 .../runtimer/config/RuntimerConfigDefine.java      |   2 +
 .../service/TargetRunnerConfigOnFileObserver.java  |  21 +-
 .../src/main/resources/runtimer.properties         |  23 ++
 pom.xml                                            |   1 +
 16 files changed, 614 insertions(+), 512 deletions(-)

diff --git a/adapter/runtimer/pom.xml b/adapter/runtimer/pom.xml
index 49578b4..1e7c7ed 100644
--- a/adapter/runtimer/pom.xml
+++ b/adapter/runtimer/pom.xml
@@ -92,12 +92,6 @@
             <version>RELEASE</version>
             <scope>compile</scope>
         </dependency>
-        <dependency>
-            <groupId>org.junit.jupiter</groupId>
-            <artifactId>junit-jupiter</artifactId>
-            <version>RELEASE</version>
-            <scope>compile</scope>
-        </dependency>
         <dependency>
             <groupId>org.awaitility</groupId>
             <artifactId>awaitility</artifactId>
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
index 682b830..f871e08 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
@@ -17,26 +17,22 @@
 
 package org.apache.rocketmq.eventbridge.adapter.runtimer;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicReference;
-import javax.annotation.PostConstruct;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.EventBusListener;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.EventRuleTransfer;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.EventTargetPusher;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.EventSubscriber;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.CirculatorContext;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.RocketMQEventSubscriber;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.RuntimerState;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigObserver;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.service.AbstractTargetRunnerConfigObserver;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigOnFileObserver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.PostConstruct;
+import java.util.concurrent.atomic.AtomicReference;
+
 /**
  * event bridge runtimer
  *
@@ -49,34 +45,25 @@ public class Runtimer {
 
     private AtomicReference<RuntimerState> runtimerState;
 
-    private Plugin plugin;
-
-    private ListenerFactory listenerFactory;
-
-    private TargetRunnerConfigObserver targetRunnerConfigObserver;
-
-    private Map<String, List<TargetKeyValue>> taskConfigs = new HashMap<>();
-
-    private EventBusListener listener;
-
-    private EventRuleTransfer transfer;
-
-    private EventTargetPusher pusher;
+    private CirculatorContext circulatorContext;
 
-    private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor((Runnable r) -> new Thread(r, "RuntimerScheduledThread"));
+    private AbstractTargetRunnerConfigObserver runnerConfigObserver;
 
-    public Runtimer(Plugin plugin, ListenerFactory listenerFactory, TargetRunnerConfigObserver configManageService) {
-        this.plugin = plugin;
-        this.listenerFactory = listenerFactory;
-        this.targetRunnerConfigObserver = configManageService;
+    public Runtimer(CirculatorContext circulatorContext) {
+        this.circulatorContext = circulatorContext;
+        this.runnerConfigObserver = new TargetRunnerConfigOnFileObserver();
     }
 
     @PostConstruct
     public void initAndStart() {
         logger.info("init runtimer task config");
-        new EventBusListener(listenerFactory, new RocketMQEventSubscriber(listenerFactory)).start();
-        new EventRuleTransfer(listenerFactory).start();
-        new EventTargetPusher(listenerFactory).start();
+        circulatorContext.initListenerMetadata(runnerConfigObserver.getTargetRunnerConfig());
+        EventSubscriber eventSubscriber = new RocketMQEventSubscriber(runnerConfigObserver);
+        runnerConfigObserver.registerListener(circulatorContext);
+        runnerConfigObserver.registerListener(eventSubscriber);
+        new EventBusListener(circulatorContext, eventSubscriber).start();
+        new EventRuleTransfer(circulatorContext).start();
+        new EventTargetPusher(circulatorContext).start();
         startRuntimer();
     }
 
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
index bc44668..d9227d0 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
@@ -18,10 +18,14 @@
 package org.apache.rocketmq.eventbridge.adapter.runtimer.boot;
 
 import io.openmessaging.connector.api.data.ConnectRecord;
-import java.util.List;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.EventSubscriber;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.CirculatorContext;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
 
 /**
  * listen the event and offer to queue
@@ -30,25 +34,35 @@ import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
  */
 public class EventBusListener extends ServiceThread {
 
-    private ListenerFactory listenerFactory;
+    private static final Logger logger = LoggerFactory.getLogger(EventBusListener.class);
+
+    private CirculatorContext circulatorContext;
 
     private EventSubscriber eventSubscriber;
 
-    public EventBusListener(ListenerFactory listenerFactory,
-        EventSubscriber eventSubscriber) {
-        this.listenerFactory = listenerFactory;
+    public EventBusListener(CirculatorContext circulatorContext, EventSubscriber eventSubscriber) {
+        this.circulatorContext = circulatorContext;
         this.eventSubscriber = eventSubscriber;
     }
 
     @Override
     public void run() {
         while (!stopped) {
-            List<ConnectRecord> recordList = eventSubscriber.pull();
-            listenerFactory.offerEventRecords(recordList);
+            try{
+                List<ConnectRecord> recordList = eventSubscriber.pull();
+                if(CollectionUtils.isEmpty(recordList)){
+                    this.waitForRunning(1000);
+                    continue;
+                }
+                circulatorContext.offerEventRecords(recordList);
+            }catch (Exception exception) {
+                logger.error(getServiceName() + " - event bus pull record exception, stackTrace - ", exception);
+            }
         }
     }
 
-    @Override public String getServiceName() {
+    @Override
+    public String getServiceName() {
         return EventBusListener.class.getSimpleName();
     }
 }
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
index eaee64c..f9492f6 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
@@ -19,22 +19,17 @@ package org.apache.rocketmq.eventbridge.adapter.runtimer.boot;
 
 import com.alibaba.fastjson.JSON;
 import io.openmessaging.connector.api.data.ConnectRecord;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.CirculatorContext;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.transfer.TransformEngine;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Objects;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.transfer.TransformEngine;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * receive event and transfer the rule to pusher
@@ -43,12 +38,12 @@ public class EventRuleTransfer extends ServiceThread {
 
     private static final Logger logger = LoggerFactory.getLogger(EventRuleTransfer.class);
 
-    private ListenerFactory listenerFactory;
+    private CirculatorContext circulatorContext;
 
     private ExecutorService executorService = new ThreadPoolExecutor(20, 60, 1000, TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(100));
 
-    public EventRuleTransfer(ListenerFactory listenerFactory) {
-        this.listenerFactory = listenerFactory;
+    public EventRuleTransfer(CirculatorContext circulatorContext) {
+        this.circulatorContext = circulatorContext;
     }
 
     @Override
@@ -59,8 +54,8 @@ public class EventRuleTransfer extends ServiceThread {
     @Override
     public void run() {
         while (!stopped) {
-            //TODO JAVA8 并发流处理
-            ConnectRecord eventRecord = listenerFactory.takeEventRecord();
+            // add CompletableFuture
+            ConnectRecord eventRecord = circulatorContext.takeEventRecord();
             if (Objects.isNull(eventRecord)) {
                 logger.info("listen eventRecord is empty, continue by curTime - {}", System.currentTimeMillis());
                 this.waitForRunning(1000);
@@ -69,22 +64,17 @@ public class EventRuleTransfer extends ServiceThread {
             executorService.submit(() -> {
                 // extension add sub
                 // rule - target
-                listenerFactory.getTaskTransformMap().entrySet().forEach(entry -> {
+                circulatorContext.getTaskTransformMap().entrySet().forEach(entry -> {
                     TransformEngine<ConnectRecord> transformEngine = entry.getValue();
                     ConnectRecord transformRecord = transformEngine.doTransforms(eventRecord);
                     if (Objects.isNull(transformRecord)) {
                         return;
                     }
                     // a bean for maintain
-                    listenerFactory.offerTargetTaskQueue(transformRecord);
+                    circulatorContext.offerTargetTaskQueue(transformRecord);
                     logger.debug("offer target task queue succeed, targetMap - {}", JSON.toJSONString(transformRecord));
                 });
-
-
             });
         }
     }
-
-
-
 }
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
index 35c6bb2..9c17d2a 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
@@ -19,29 +19,17 @@ package org.apache.rocketmq.eventbridge.adapter.runtimer.boot;
 
 import com.alibaba.fastjson.JSON;
 import com.google.common.collect.Lists;
-import io.netty.util.internal.ConcurrentSet;
 import io.openmessaging.connector.api.component.task.sink.SinkTask;
 import io.openmessaging.connector.api.data.ConnectRecord;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
-import org.apache.commons.collections.MapUtils;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerListener;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.pusher.PusherTaskContext;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.CirculatorContext;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.PluginClassLoader;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigObserver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
+import java.util.Objects;
+
 /**
  * event target push to sink task
  *
@@ -51,84 +39,30 @@ public class EventTargetPusher extends ServiceThread{
 
     private static final Logger logger = LoggerFactory.getLogger(EventTargetPusher.class);
 
-    private Set<Runnable> runningTasks = new ConcurrentSet<>();
-
-    private Set<Runnable> errorTasks = new ConcurrentSet<>();
-
-    private Set<Runnable> stoppedTasks = new ConcurrentSet<>();
-
-    private Plugin plugin;
-
-    private ListenerFactory listenerFactory;
-
-    private TargetRunnerConfigObserver targetRunnerConfigObserver;
-
-    private List<SinkTask> pusherTasks = new CopyOnWriteArrayList<>();
+    private CirculatorContext circulatorContext;
 
-    public EventTargetPusher(ListenerFactory listenerFactory) {
-        this.listenerFactory = listenerFactory;
-    }
-
-    /**
-     * init running tasks
-     *
-     * @param taskConfig
-     */
-    public void initOrUpdatePusherTask(Map<String, List<TargetKeyValue>> taskConfig) {
-        Set<TargetKeyValue> taskProperty = new HashSet<>();
-        for (String connectName : taskConfig.keySet()) {
-            List<TargetKeyValue> targetKeyValues = taskConfig.get(connectName);
-            taskProperty.addAll(new HashSet<>(targetKeyValues));
-        }
-        for (TargetKeyValue targetKeyValue : taskProperty) {
-            try {
-                String taskClass = targetKeyValue.getString(RuntimerConfigDefine.TASK_CLASS);
-                ClassLoader loader = plugin.getPluginClassLoader(taskClass);
-                Class taskClazz;
-                boolean isolationFlag = false;
-                if (loader instanceof PluginClassLoader) {
-                    taskClazz = ((PluginClassLoader) loader).loadClass(taskClass, false);
-                    isolationFlag = true;
-                } else {
-                    taskClazz = Class.forName(taskClass);
-                }
-                SinkTask sinkTask = (SinkTask) taskClazz.getDeclaredConstructor().newInstance();
-                sinkTask.init(targetKeyValue);
-                PusherTaskContext sinkTaskContext = new PusherTaskContext(targetKeyValue);
-                sinkTask.start(sinkTaskContext);
-                pusherTasks.add(sinkTask);
-                if (isolationFlag) {
-                    Plugin.compareAndSwapLoaders(loader);
-                }
-                logger.info("init target task succeed, target key - {}", JSON.toJSONString(targetKeyValue));
-            } catch (Exception exception) {
-                exception.printStackTrace();
-            }
-        }
+    public EventTargetPusher(CirculatorContext circulatorContext) {
+        this.circulatorContext = circulatorContext;
     }
 
     @Override
     public void run() {
         while (!stopped) {
-//            Map<TargetKeyValue, ConnectRecord> taskPusher = listenerFactory.takeTargetMap();
-//            if (MapUtils.isEmpty(taskPusher)) {
-//                logger.info("current target pusher is empty");
-//                this.waitForRunning(1000);
-//                continue;
-//            }
-//            logger.info("start push content by pusher - {}", JSON.toJSONString(taskPusher));
-//
-//            TargetKeyValue targetKeyValue = taskPusher.keySet().iterator().next();
-//            // task-id for unique-key at ConnectKeyValue
-//            // ConnectKeyValue -> new class for name
-//            // also add in ConnectRecord class system property
-//            String taskPushName = targetKeyValue.getString(RuntimerConfigDefine.TASK_CLASS);
-//            // add thread pool
-//            for (SinkTask sinkTask : pusherTasks) {
-//                if (sinkTask.getClass().getName().equals(taskPushName)) {
-//                    sinkTask.put(Lists.newArrayList(taskPusher.get(targetKeyValue)));
-//                }
-//            }
+            ConnectRecord targetRecord = circulatorContext.takeTargetMap();
+            if (Objects.isNull(targetRecord)) {
+                logger.info("current target pusher is empty");
+                this.waitForRunning(1000);
+                continue;
+            }
+            if(logger.isDebugEnabled()){
+                logger.debug("start push content by pusher - {}", JSON.toJSONString(targetRecord));
+            }
+
+            Map<String, SinkTask> latestTaskMap = circulatorContext.getPusherTaskMap();
+            String runnerName = targetRecord.getExtensions().getString(RuntimerConfigDefine.RUNNER_NAME);
+            SinkTask sinkTask = latestTaskMap.get(runnerName);
+            // add thread pool
+            sinkTask.put(Lists.newArrayList(targetRecord));
         }
     }
 
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/CirculatorContext.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/CirculatorContext.java
new file mode 100644
index 0000000..58e6b04
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/CirculatorContext.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener;
+
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.pusher.PusherTaskContext;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.transfer.TransformEngine;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.LoggerName;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.enums.RefreshTypeEnum;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.PluginClassLoader;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * event circulator context for transfer and pusher
+ */
+@Component
+public class CirculatorContext implements TargetRunnerListener {
+
+    private final static Logger logger = LoggerFactory.getLogger(LoggerName.EventBus_Listener);
+
+    private BlockingQueue<ConnectRecord> eventRecord = new LinkedBlockingQueue<>(50000);
+
+    private BlockingQueue<ConnectRecord> targetQueue = new LinkedBlockingQueue<>(50000);
+
+    private Map<String/*RunnerName*/, TransformEngine<ConnectRecord>> taskTransformMap = new ConcurrentHashMap<>(20);
+
+    private Map<String/*RunnerName*/, SinkTask> pusherTaskMap = new ConcurrentHashMap<>(20);
+
+    private Plugin plugin;
+
+    public CirculatorContext(Plugin plugin){
+        this.plugin = plugin;
+    }
+
+    /**
+     * initial targetRunnerMap, taskTransformMap, pusherTaskMap
+     * @param targetRunnerConfigs
+     */
+    public void initListenerMetadata(Set<TargetRunnerConfig> targetRunnerConfigs) {
+        if (CollectionUtils.isEmpty(targetRunnerConfigs)) {
+            return;
+        }
+        for (TargetRunnerConfig targetRunnerConfig : targetRunnerConfigs) {
+            onAddTargetRunner(targetRunnerConfig);
+        }
+    }
+
+    @Override
+    public void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+        refreshRunnerMetadata(targetRunnerConfig, RefreshTypeEnum.ADD);
+    }
+
+    @Override
+    public void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+        refreshRunnerMetadata(targetRunnerConfig, RefreshTypeEnum.UPDATE);
+    }
+
+    @Override
+    public void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+        refreshRunnerMetadata(targetRunnerConfig, RefreshTypeEnum.DELETE);
+    }
+
+    /**
+     * offer event records
+     *
+     * @param connectRecords
+     */
+   public boolean  offerEventRecords(List<ConnectRecord> connectRecords) {
+        return eventRecord.addAll(connectRecords);
+    }
+
+    /**
+     * take event record
+     *
+     * @return
+     */
+    public ConnectRecord takeEventRecord() {
+        if (eventRecord.isEmpty()) {
+            return null;
+        }
+        try {
+            return eventRecord.take();
+        } catch (Exception exception) {
+            logger.error("take event record exception - stack-> ", exception);
+        }
+        return null;
+    }
+
+    public Map<String, TransformEngine<ConnectRecord>> getTaskTransformMap() {
+        return taskTransformMap;
+    }
+
+    public Map<String, SinkTask> getPusherTaskMap() {
+        return pusherTaskMap;
+    }
+
+    public boolean offerTargetTaskQueue(ConnectRecord connectRecord) {
+        return targetQueue.offer(connectRecord);
+    }
+
+    public ConnectRecord takeTargetMap() {
+        if (targetQueue.isEmpty()) {
+            return null;
+        }
+        try {
+            return targetQueue.take();
+        } catch (Exception exception) {
+            exception.printStackTrace();
+        }
+        return null;
+    }
+
+    /**
+     * refresh target runner where config changed
+     * @param targetRunnerConfig
+     * @param refreshTypeEnum
+     */
+    private void refreshRunnerMetadata(TargetRunnerConfig targetRunnerConfig, RefreshTypeEnum refreshTypeEnum) {
+        String runnerName = targetRunnerConfig.getName();
+        switch (refreshTypeEnum){
+            case ADD:
+            case UPDATE:
+                for(Map<String, String> configMap : targetRunnerConfig.getComponents()){
+                    TargetKeyValue targetKeyValue = new TargetKeyValue(configMap);
+                    TransformEngine<ConnectRecord> transformChain = new TransformEngine<>(targetKeyValue, plugin);
+                    taskTransformMap.put(runnerName, transformChain);
+
+                    SinkTask sinkTask = initTargetSinkTask(targetKeyValue);
+                    pusherTaskMap.put(runnerName, sinkTask);
+                }
+                break;
+            case DELETE:
+                taskTransformMap.remove(runnerName);
+                pusherTaskMap.remove(runnerName);
+                break;
+            default:
+                break;
+        }
+    }
+
+    /**
+     * init target sink task
+     * @param targetKeyValue
+     * @return
+     */
+    private SinkTask initTargetSinkTask(TargetKeyValue targetKeyValue) {
+        String taskClass = targetKeyValue.getString(RuntimerConfigDefine.TASK_CLASS);
+        ClassLoader loader = plugin.getPluginClassLoader(taskClass);
+        Class taskClazz;
+        boolean isolationFlag = false;
+        try {
+            if (loader instanceof PluginClassLoader) {
+                taskClazz = ((PluginClassLoader) loader).loadClass(taskClass, false);
+                isolationFlag = true;
+            } else {
+                taskClazz = Class.forName(taskClass);
+            }
+            SinkTask sinkTask = (SinkTask) taskClazz.getDeclaredConstructor().newInstance();
+            sinkTask.init(targetKeyValue);
+            PusherTaskContext sinkTaskContext = new PusherTaskContext(targetKeyValue);
+            sinkTask.start(sinkTaskContext);
+            if (isolationFlag) {
+                Plugin.compareAndSwapLoaders(loader);
+            }
+            return sinkTask;
+        }catch (Exception e) {
+            e.printStackTrace();
+        }
+        return null;
+    }
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/EventSubscriber.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/EventSubscriber.java
index 23c499b..f24ce08 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/EventSubscriber.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/EventSubscriber.java
@@ -18,12 +18,19 @@
 package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener;
 
 import io.openmessaging.connector.api.data.ConnectRecord;
-import java.util.List;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.enums.RefreshTypeEnum;
+
+import java.util.List;
 
 public abstract class EventSubscriber implements TargetRunnerListener {
 
-    abstract void refresh();
+    /**
+     * Refresh subscriber inner data when runner config changed
+     * @param targetRunnerConfig
+     * @param refreshTypeEnum
+     */
+    public abstract void refresh(TargetRunnerConfig targetRunnerConfig, RefreshTypeEnum refreshTypeEnum);
 
     /**
      * Pull connect records from store, Blocking method when is empty.
@@ -46,7 +53,7 @@ public abstract class EventSubscriber implements TargetRunnerListener {
      */
     @Override
     public void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig) {
-        this.refresh();
+        this.refresh(targetRunnerConfig, RefreshTypeEnum.ADD);
     }
 
     /**
@@ -56,8 +63,7 @@ public abstract class EventSubscriber implements TargetRunnerListener {
      */
     @Override
     public void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig) {
-
-        this.refresh();
+        this.refresh(targetRunnerConfig, RefreshTypeEnum.UPDATE);
     }
 
     /**
@@ -67,7 +73,7 @@ public abstract class EventSubscriber implements TargetRunnerListener {
      */
     @Override
     public void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig) {
-        this.refresh();
+        this.refresh(targetRunnerConfig, RefreshTypeEnum.DELETE);
     }
 
 }
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java
deleted file mode 100644
index 2d22094..0000000
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener;
-
-import com.alibaba.fastjson.JSON;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import io.openmessaging.connector.api.component.task.sink.SinkTask;
-import io.openmessaging.connector.api.data.ConnectRecord;
-import io.openmessaging.connector.api.data.RecordOffset;
-import io.openmessaging.connector.api.data.RecordPartition;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.transfer.TransformEngine;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.LoggerName;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
-
-@Component
-public class ListenerFactory implements TargetRunnerListener {
-
-    private final static Logger logger = LoggerFactory.getLogger(LoggerName.EventBus_Listener);
-
-    private static final String SEMICOLON = ";";
-
-    private static final String SYS_DEFAULT_GROUP = "default-%s-group";
-
-    public static final String QUEUE_OFFSET = "queueOffset";
-
-    private Plugin plugin;
-
-    private BlockingQueue<TargetRunnerConfig> pusherTargetQueue = new LinkedBlockingQueue<>(1000);
-
-    private BlockingQueue<MessageExt> eventMessage = new LinkedBlockingQueue(50000);
-
-    private BlockingQueue<ConnectRecord> eventRecord = new LinkedBlockingQueue<>(50000);
-
-    private BlockingQueue<ConnectRecord> targetQueue = new LinkedBlockingQueue<>(50000);
-
-    private Map<String/*TargetRunnerName*/, TransformEngine<ConnectRecord>> taskTransformMap = new ConcurrentHashMap<>(20);
-    private Map<String/*TargetRunnerName*/, SinkTask> pusherTaskMap = new ConcurrentHashMap<>(20);
-    private Map<String/*TargetRunnerName*/, TargetRunnerConfig> targetRunnerConfigMap = new ConcurrentHashMap<>(20);
-
-    @Value("${rocketmq.namesrvAddr:}")
-    private String namesrvAddr;
-
-    public DefaultLitePullConsumer initDefaultMQPullConsumer(String topic) {
-        DefaultLitePullConsumer consumer = new DefaultLitePullConsumer();
-        try {
-            consumer.setConsumerGroup(String.format(SYS_DEFAULT_GROUP, topic));
-            consumer.setNamesrvAddr(namesrvAddr);
-            consumer.subscribe(topic, "*");
-            consumer.start();
-        } catch (Exception exception) {
-            logger.error("init default pull consumer exception, topic -" + topic + "-stackTrace-", exception);
-        }
-        return consumer;
-    }
-
-    public TargetRunnerConfig takeTaskConfig() {
-        if (pusherTargetQueue.isEmpty()) {
-            return null;
-        }
-        try {
-            return pusherTargetQueue.take();
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-        return null;
-    }
-
-    /**
-     * Offer listener event
-     *
-     * @param messageExt
-     * @return
-     */
-    public boolean offerListenEvent(MessageExt messageExt) {
-        return eventMessage.offer(messageExt);
-    }
-
-    public MessageExt takeListenerEvent() {
-        if (eventMessage.isEmpty()) {
-            return null;
-        }
-        try {
-            return eventMessage.take();
-        } catch (Exception exception) {
-            exception.printStackTrace();
-        }
-        return null;
-    }
-
-    /**
-     * offer event records
-     *
-     * @param connectRecords
-     */
-   public boolean  offerEventRecords(List<ConnectRecord> connectRecords) {
-        return eventRecord.addAll(connectRecords);
-    }
-
-    /**
-     * take event record
-     *
-     * @return
-     */
-    public ConnectRecord takeEventRecord() {
-        if (eventRecord.isEmpty()) {
-            return null;
-        }
-        try {
-            return eventRecord.take();
-        } catch (Exception exception) {
-            logger.error("take event record exception - stack-> ", exception);
-        }
-        return null;
-    }
-
-    public String createInstance(String servers) {
-        String[] serversArray = servers.split(";");
-        List<String> serversList = new ArrayList<String>();
-        for (String server : serversArray) {
-            if (!serversList.contains(server)) {
-                serversList.add(server);
-            }
-        }
-        Collections.sort(serversList);
-        return String.valueOf(serversList.toString().hashCode());
-    }
-
-    /**
-     * parse topic list by task config
-     *
-     * @param taskConfig
-     * @return
-     */
-    public List<String> parseTopicList(TargetKeyValue taskConfig) {
-        String messageQueueStr = taskConfig.getString(RuntimerConfigDefine.CONNECT_TOPICNAME);
-        if (StringUtils.isBlank(messageQueueStr)) {
-            return null;
-        }
-        List<String> topicList = Splitter.on(SEMICOLON).omitEmptyStrings().trimResults().splitToList(messageQueueStr);
-        return Lists.newArrayList(new HashSet<>(topicList));
-    }
-
-    /**
-     * parse topic list by task config list
-     *
-     * @param taskConfigs
-     * @return
-     */
-    public List<String> parseTopicListByList(List<TargetKeyValue> taskConfigs) {
-        Set<String> allTopicList = Sets.newHashSet();
-        for (TargetKeyValue taskConfig : taskConfigs) {
-            String messageQueueStr = taskConfig.getString(RuntimerConfigDefine.CONNECT_TOPICNAME);
-            if (StringUtils.isBlank(messageQueueStr)) {
-                continue;
-            }
-            List<String> topicList = Splitter.on(SEMICOLON).omitEmptyStrings().trimResults().splitToList(messageQueueStr);
-            allTopicList.addAll(topicList);
-        }
-        return Lists.newArrayList(allTopicList);
-    }
-
-    /**
-     * parse msg queue by queue json
-     *
-     * @param messageQueueStr
-     * @return
-     */
-    public MessageQueue parseMessageQueueList(String messageQueueStr) {
-        List<String> messageQueueStrList = Splitter.on(SEMICOLON).omitEmptyStrings().trimResults().splitToList(messageQueueStr);
-        if (CollectionUtils.isEmpty(messageQueueStrList) || messageQueueStrList.size() != 3) {
-            return null;
-        }
-        return new MessageQueue(messageQueueStrList.get(0), messageQueueStrList.get(1), Integer.valueOf(messageQueueStrList.get(2)));
-    }
-
-    public RecordPartition convertToRecordPartition(String topic, String brokerName, int queueId) {
-        Map<String, String> map = new HashMap<>();
-        map.put("topic", topic);
-        map.put("brokerName", brokerName);
-        map.put("queueId", queueId + "");
-        RecordPartition recordPartition = new RecordPartition(map);
-        return recordPartition;
-    }
-
-    public RecordOffset convertToRecordOffset(Long offset) {
-        Map<String, String> offsetMap = new HashMap<>();
-        offsetMap.put(QUEUE_OFFSET, offset + "");
-        RecordOffset recordOffset = new RecordOffset(offsetMap);
-        return recordOffset;
-    }
-
-    public boolean offerTargetTaskQueue(ConnectRecord connectRecord) {
-        return targetQueue.offer(connectRecord);
-    }
-
-    public ConnectRecord takeTargetMap() {
-        if (targetQueue.isEmpty()) {
-            return null;
-        }
-        try {
-            return targetQueue.take();
-        } catch (Exception exception) {
-            exception.printStackTrace();
-        }
-        return null;
-    }
-
-    /**
-     * Init sink task transform map key: task config value: transformEngine
-     *
-     * @param taskConfig
-     * @return
-     */
-    private Map<TargetKeyValue, TransformEngine<ConnectRecord>> initSinkTaskTransformInfo(
-        Map<String, List<TargetKeyValue>> taskConfig) {
-        Map<TargetKeyValue, TransformEngine<ConnectRecord>> curTaskTransformMap = new HashMap<>();
-        Set<TargetKeyValue> allTaskKeySet = new HashSet<>();
-        for (String connectName : taskConfig.keySet()) {
-            List<TargetKeyValue> taskKeyList = taskConfig.get(connectName);
-            allTaskKeySet.addAll(new HashSet<>(taskKeyList));
-        }
-        for (TargetKeyValue keyValue : allTaskKeySet) {
-            TransformEngine<ConnectRecord> transformChain = new TransformEngine<>(keyValue, plugin);
-            curTaskTransformMap.put(keyValue, transformChain);
-        }
-        logger.info("init sink task transform info succeed, transform map - {}", JSON.toJSONString(curTaskTransformMap));
-        return curTaskTransformMap;
-    }
-
-
-    @Override public void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig) {
-        // refresh taskTransformMap & pusherTaskMap & targetRunnerConfigMap
-    }
-
-    @Override public void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig) {
-
-    }
-
-    @Override public void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig) {
-
-    }
-
-    public Map<String, TransformEngine<ConnectRecord>> getTaskTransformMap() {
-        return taskTransformMap;
-    }
-
-    public Map<String, SinkTask> getPusherTaskMap() {
-        return pusherTaskMap;
-    }
-
-    public Map<String, TargetRunnerConfig> getTargetRunnerConfigMap() {
-        return targetRunnerConfigMap;
-    }
-
-}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
index 7c8e5a3..4ca0507 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
@@ -14,30 +14,276 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
+
 package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener;
 
+import com.alibaba.fastjson.JSON;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.data.ConnectRecord;
-import java.util.List;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.enums.RefreshTypeEnum;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigObserver;
+import org.apache.rocketmq.eventbridge.exception.EventBridgeException;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.io.support.PropertiesLoaderUtils;
+import org.springframework.util.CollectionUtils;
+
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.stream.Collectors;
 
+/**
+ * RocketMQ implement event subscriber
+ */
 public class RocketMQEventSubscriber extends EventSubscriber {
 
-    private ListenerFactory listenerFactory;
+    private static final Logger logger = LoggerFactory.getLogger(RocketMQEventSubscriber.class);
 
-    public RocketMQEventSubscriber(
-        ListenerFactory listenerFactory) {
-        this.listenerFactory = listenerFactory;
-    }
+    private DefaultLitePullConsumer pullConsumer;
+
+    private TargetRunnerConfigObserver runnerConfigObserver;
+
+    private Integer pullTimeOut;
+
+    private String namesrvAddr;
+
+    private static final String SEMICOLON = ";";
+
+    private static final String SYS_DEFAULT_GROUP = "event-bridge-default-group";
+
+    public static final String QUEUE_OFFSET = "queueOffset";
 
-    @Override void refresh() {
+    public RocketMQEventSubscriber(TargetRunnerConfigObserver runnerConfigObserver) {
+        this.runnerConfigObserver = runnerConfigObserver;
+        this.initMqProperties();
+        this.initPullConsumer();
+    }
 
+    @Override
+    public void refresh(TargetRunnerConfig targetRunnerConfig, RefreshTypeEnum refreshTypeEnum) {
+        if(Objects.isNull(pullConsumer)){
+            pullConsumer = initDefaultMQPullConsumer();
+            return;
+        }
+        Set<String> currentTopics = parseTopicsByRunnerConfigs(Sets.newHashSet(targetRunnerConfig));
+        for (String topic : currentTopics){
+            switch (refreshTypeEnum){
+                case ADD:
+                case UPDATE:
+                        subscribe(topic);
+                        break;
+                case DELETE:
+                        unSubscribe(topic);
+                        break;
+                default:
+                    break;
+            }
+        }
     }
 
-    @Override public List<ConnectRecord> pull() {
-        return null;
+    @Override
+    public List<ConnectRecord> pull() {
+        List<MessageExt> messageExts = pullConsumer.poll(pullTimeOut);
+        if (CollectionUtils.isEmpty(messageExts)) {
+            logger.info("consumer poll message empty , consumer - {}", JSON.toJSONString(pullConsumer));
+            return null;
+        }
+        List<ConnectRecord> connectRecords = Lists.newArrayList();
+        for (MessageExt messageExt : messageExts) {
+            ConnectRecord eventRecord = convertToSinkRecord(messageExt);
+            connectRecords.add(eventRecord);
+            if(logger.isDebugEnabled()){
+                logger.debug("offer listen event record -  {} - by message event- {}", eventRecord, messageExt);
+            }
+        }
+        return connectRecords;
     }
 
     @Override
     public void commit(List<ConnectRecord> connectRecordList) {
 
     }
+
+    /**
+     * parse topics by specific target runner configs
+     * @param targetRunnerConfigs
+     * @return
+     */
+    public Set<String> parseTopicsByRunnerConfigs(Set<TargetRunnerConfig> targetRunnerConfigs){
+        if(org.apache.commons.collections.CollectionUtils.isEmpty(targetRunnerConfigs)){
+            logger.warn("target runner config is empty, parse to topic failed!");
+            return null;
+        }
+        Set<String> listenTopics = Sets.newHashSet();
+        for(TargetRunnerConfig runnerConfig : targetRunnerConfigs){
+            List<Map<String,String>> runnerConfigMap = runnerConfig.getComponents();
+            if(org.apache.commons.collections.CollectionUtils.isEmpty(runnerConfigMap)){
+                continue;
+            }
+            listenTopics.addAll(runnerConfigMap.stream().map(item->item.get(RuntimerConfigDefine.CONNECT_TOPICNAME)).collect(Collectors.toSet()));
+        }
+        return listenTopics;
+    }
+
+    /**
+     * init rocketmq ref config
+     */
+    private void initMqProperties() {
+        try {
+            Properties properties = PropertiesLoaderUtils.loadAllProperties("runtimer.properties");
+            namesrvAddr = properties.getProperty("rocketmq.namesrvAddr");
+            pullTimeOut = Integer.valueOf(properties.getProperty("rocketmq.consumer.pullTimeOut"));
+        }catch (Exception exception){
+
+        }
+
+    }
+
+    /**
+     * init rocket mq pull consumer
+     */
+    private void initPullConsumer() {
+        pullConsumer = initDefaultMQPullConsumer();
+    }
+
+    /**
+     * first init default rocketmq pull consumer
+     * @return
+     */
+    public DefaultLitePullConsumer initDefaultMQPullConsumer () {
+        Set<TargetRunnerConfig> targetRunnerConfigs = runnerConfigObserver.getTargetRunnerConfig();
+        Set<String> topics = parseTopicsByRunnerConfigs(targetRunnerConfigs);
+        DefaultLitePullConsumer consumer = new DefaultLitePullConsumer();
+        consumer.setConsumerGroup(createGroupName(SYS_DEFAULT_GROUP));
+        consumer.setNamesrvAddr(namesrvAddr);
+        try {
+            for(String topic : topics){
+                consumer.subscribe(topic, "*");
+            }
+            consumer.start();
+        } catch (Exception exception) {
+            logger.error("init default pull consumer exception, topic -" + topics.toString() + "-stackTrace-", exception);
+            throw new EventBridgeException(" init rocketmq consumer failed");
+        }
+        return consumer;
+    }
+
+    private String createGroupName(String prefix) {
+        StringBuilder sb = new StringBuilder();
+        sb.append(prefix).append("-");
+        sb.append(RemotingUtil.getLocalAddress()).append("-");
+        sb.append(UtilAll.getPid()).append("-");
+        sb.append(System.nanoTime());
+        return sb.toString().replace(".", "-");
+    }
+
+    private String createInstance(String servers) {
+        String[] serversArray = servers.split(";");
+        List<String> serversList = new ArrayList<String>();
+        for (String server : serversArray) {
+            if (!serversList.contains(server)) {
+                serversList.add(server);
+            }
+        }
+        Collections.sort(serversList);
+        return String.valueOf(serversList.toString().hashCode());
+    }
+
+    /**
+     * parse msg queue by queue json
+     *
+     * @param messageQueueStr
+     * @return
+     */
+    private MessageQueue parseMessageQueueList(String messageQueueStr) {
+        List<String> messageQueueStrList = Splitter.on(SEMICOLON).omitEmptyStrings().trimResults().splitToList(messageQueueStr);
+        if (org.apache.commons.collections.CollectionUtils.isEmpty(messageQueueStrList) || messageQueueStrList.size() != 3) {
+            return null;
+        }
+        return new MessageQueue(messageQueueStrList.get(0), messageQueueStrList.get(1), Integer.valueOf(messageQueueStrList.get(2)));
+    }
+
+    /**
+     * MessageExt convert to connect record
+     * @param messageExt
+     * @return
+     */
+    private ConnectRecord convertToSinkRecord(MessageExt messageExt) {
+        Map<String, String> properties = messageExt.getProperties();
+        Schema schema;
+        Long timestamp;
+        ConnectRecord sinkRecord;
+        String connectTimestamp = properties.get(RuntimerConfigDefine.CONNECT_TIMESTAMP);
+        timestamp = StringUtils.isNotEmpty(connectTimestamp) ? Long.valueOf(connectTimestamp) : null;
+        String connectSchema = properties.get(RuntimerConfigDefine.CONNECT_SCHEMA);
+        schema = StringUtils.isNotEmpty(connectSchema) ? JSON.parseObject(connectSchema, Schema.class) : null;
+        byte[] body = messageExt.getBody();
+        RecordPartition recordPartition = convertToRecordPartition(messageExt.getTopic(), messageExt.getBrokerName(), messageExt.getQueueId());
+        RecordOffset recordOffset = convertToRecordOffset(messageExt.getQueueOffset());
+        String bodyStr = new String(body, StandardCharsets.UTF_8);
+        sinkRecord = new ConnectRecord(recordPartition, recordOffset, timestamp, schema, bodyStr);
+        KeyValue keyValue = new DefaultKeyValue();
+        keyValue.put(RuntimerConfigDefine.CONNECT_TOPICNAME, messageExt.getTopic());
+        if (MapUtils.isNotEmpty(properties)) {
+            for (Map.Entry<String, String> entry : properties.entrySet()) {
+                keyValue.put(entry.getKey(), entry.getValue());
+            }
+        }
+        sinkRecord.addExtension(keyValue);
+        return sinkRecord;
+    }
+
+    private RecordPartition convertToRecordPartition(String topic, String brokerName, int queueId) {
+        Map<String, String> map = new HashMap<>();
+        map.put("topic", topic);
+        map.put("brokerName", brokerName);
+        map.put("queueId", queueId + "");
+        RecordPartition recordPartition = new RecordPartition(map);
+        return recordPartition;
+    }
+
+    private RecordOffset convertToRecordOffset(Long offset) {
+        Map<String, String> offsetMap = new HashMap<>();
+        offsetMap.put(QUEUE_OFFSET, offset + "");
+        RecordOffset recordOffset = new RecordOffset(offsetMap);
+        return recordOffset;
+    }
+
+    /**
+     * new topic for subscribe
+     * @param topic
+     */
+    private void subscribe(String topic) {
+        try {
+            pullConsumer.subscribe(topic, "*");
+        } catch (MQClientException exception) {
+            logger.error("rocketmq event subscribe new topic failed, stack trace - ", exception);
+        }
+    }
+
+    /**
+     * unsubscribe old topic
+     * @param topic
+     */
+    private void unSubscribe(String topic) {
+        pullConsumer.unsubscribe(topic);
+    }
+
 }
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/TargetRunnerListener.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/TargetRunnerListener.java
index f528b7b..8284e8a 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/TargetRunnerListener.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/TargetRunnerListener.java
@@ -14,6 +14,7 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
+
 package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener;
 
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetKeyValue.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetKeyValue.java
index c3c6a07..5d52128 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetKeyValue.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetKeyValue.java
@@ -18,11 +18,10 @@
 package org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity;
 
 import io.openmessaging.KeyValue;
-import org.springframework.util.IdGenerator;
-import org.springframework.util.SimpleIdGenerator;
 
 import java.io.Serializable;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -47,6 +46,11 @@ public class TargetKeyValue implements KeyValue, Serializable {
         targetKeyId = UUID.randomUUID().toString();
     }
 
+    public TargetKeyValue(Map<String, String> targetMap) {
+        properties = new ConcurrentHashMap<>(targetMap);
+        targetKeyId = UUID.randomUUID().toString();
+    }
+
     @Override
     public KeyValue put(String key, int value) {
         properties.put(key, String.valueOf(value));
@@ -136,24 +140,24 @@ public class TargetKeyValue implements KeyValue, Serializable {
     }
 
     @Override
-    public boolean equals(Object obj) {
-
-        if (obj != null && obj.getClass() == this.getClass()) {
-            TargetKeyValue keyValue = (TargetKeyValue) obj;
-            return this.properties.equals(keyValue.getProperties());
-        }
-        return false;
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        TargetKeyValue that = (TargetKeyValue) o;
+        return Objects.equals(targetKeyId, that.targetKeyId) && Objects.equals(properties, that.properties);
     }
 
     @Override
     public int hashCode() {
-        return properties.hashCode();
+        return Objects.hash(targetKeyId, properties);
     }
 
-    @Override public String toString() {
-        return "ConnectKeyValue{" +
-            "properties=" + properties +
-            '}';
+    @Override
+    public String toString() {
+        return "TargetKeyValue{" +
+                "targetKeyId='" + targetKeyId + '\'' +
+                ", properties=" + properties +
+                '}';
     }
 
     public String getTargetKeyId() {
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/enums/RefreshTypeEnum.java
similarity index 57%
copy from adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
copy to adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/enums/RefreshTypeEnum.java
index 7c8e5a3..9e8f6cd 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/enums/RefreshTypeEnum.java
@@ -14,30 +14,14 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener;
 
-import io.openmessaging.connector.api.data.ConnectRecord;
-import java.util.List;
+package org.apache.rocketmq.eventbridge.adapter.runtimer.common.enums;
 
-public class RocketMQEventSubscriber extends EventSubscriber {
-
-    private ListenerFactory listenerFactory;
-
-    public RocketMQEventSubscriber(
-        ListenerFactory listenerFactory) {
-        this.listenerFactory = listenerFactory;
-    }
-
-    @Override void refresh() {
-
-    }
-
-    @Override public List<ConnectRecord> pull() {
-        return null;
-    }
-
-    @Override
-    public void commit(List<ConnectRecord> connectRecordList) {
-
-    }
-}
\ No newline at end of file
+/**
+ * runner refresh type
+ */
+public enum RefreshTypeEnum {
+    ADD,
+    UPDATE,
+    DELETE
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java
index 500f300..04895cb 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java
@@ -37,6 +37,8 @@ public class RuntimerConfigDefine {
 
     public static final String TASK_CLASS = "task-class";
 
+    public static final String RUNNER_NAME = "runner-name";
+
     public static final String TASK_ID = "task-id";
 
     public static final String TASK_TYPE = "task-type";
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
index b963c72..bbb69b3 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
@@ -37,15 +37,20 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
 import org.apache.rocketmq.eventbridge.exception.EventBridgeException;
+import org.springframework.stereotype.Service;
 
 @Slf4j
 public class TargetRunnerConfigOnFileObserver extends AbstractTargetRunnerConfigObserver {
 
     private String pathName;
 
+    public static final String DEFAULT_TARGET_RUNNER_CONFIG_FILE_NAME = "target-runner.json";
+
+
     private static ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(
         ThreadUtils.newThreadFactory("TargetRunnerConfigOnFileObserver", false));
 
@@ -57,6 +62,9 @@ public class TargetRunnerConfigOnFileObserver extends AbstractTargetRunnerConfig
 
     public TargetRunnerConfigOnFileObserver() {
         super.getTargetRunnerConfig().addAll(getLatestTargetRunnerConfig());
+        if(StringUtils.isEmpty(pathName)){
+            this.pathName = getConfigFilePath();
+        }
         this.addListen(pathName, this);
     }
 
@@ -66,8 +74,7 @@ public class TargetRunnerConfigOnFileObserver extends AbstractTargetRunnerConfig
         try {
             File file = new File(pathName);
             config = FileUtils.readFileToString(file, "UTF-8");
-            Type workerConfigType = new TypeToken<HashSet<TargetRunnerConfig>>() {
-            }.getType();
+            Type workerConfigType = new TypeToken<HashSet<TargetRunnerConfig>>() {}.getType();
             Set<TargetRunnerConfig> taskConfigList = new Gson().fromJson(config, workerConfigType);
             return taskConfigList;
         } catch (IOException e) {
@@ -78,16 +85,14 @@ public class TargetRunnerConfigOnFileObserver extends AbstractTargetRunnerConfig
         }
     }
 
-    public void addListen(String pathName,
-        TargetRunnerConfigOnFileObserver pusherConfigOnFileService) {
+    public void addListen(String pathName, TargetRunnerConfigOnFileObserver pusherConfigOnFileService) {
         log.info("Watching task file changing:{}", pathName);
         int index = pathName.lastIndexOf("/");
         String filePath = pathName.substring(0, index);
         String fileName = pathName.substring(index + 1);
         service.scheduleAtFixedRate(() -> {
             try {
-                WatchService watchService = FileSystems.getDefault()
-                    .newWatchService();
+                WatchService watchService = FileSystems.getDefault().newWatchService();
                 Path path = Paths.get(filePath);
                 path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE,
                     StandardWatchEventKinds.ENTRY_MODIFY);
@@ -136,4 +141,8 @@ public class TargetRunnerConfigOnFileObserver extends AbstractTargetRunnerConfig
         return map;
     }
 
+    private String getConfigFilePath() {
+        return this.getClass().getClassLoader().getResource(DEFAULT_TARGET_RUNNER_CONFIG_FILE_NAME).getPath();
+    }
+
 }
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/resources/runtimer.properties b/adapter/runtimer/src/main/resources/runtimer.properties
new file mode 100644
index 0000000..b5659c1
--- /dev/null
+++ b/adapter/runtimer/src/main/resources/runtimer.properties
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+## rocketmq
+rocketmq.namesrvAddr=localhost:9876
+rocketmq.consumer.pullTimeOut = 3000
+rocketmq.cluster.name=DefaultCluster
+## runtimer
+rumtimer.name=eventbridge-runtimer
+runtimer.pluginpath=/Users/Local/eventbridge/plugin
+runtimer.storePathRootDir=/Users/Local/eventbridge/store
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 95bc7de..7e319aa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -89,6 +89,7 @@
         <cloudevents.version>2.3.0</cloudevents.version>
         <apache.commons-text.version>1.10.0</apache.commons-text.version>
         <mockito.version>2.13.0</mockito.version>
+        <jupiter.version>5.9.2</jupiter.version>
         <caffeine.version>2.9.3</caffeine.version>
         <fastjson.version>1.2.76</fastjson.version>
         <jacoco-maven-plugin.version>0.8.5</jacoco-maven-plugin.version>