You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by sh...@apache.org on 2023/05/07 08:20:17 UTC

[rocketmq-eventbridge] branch feat/runtimer-db-observer created (now f7c289f)

This is an automated email from the ASF dual-hosted git repository.

shenlin pushed a change to branch feat/runtimer-db-observer
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git


      at f7c289f  feat:support runtimer db observer

This branch includes the following new commits:

     new f7c289f  feat:support runtimer db observer

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[rocketmq-eventbridge] 01/01: feat:support runtimer db observer

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shenlin pushed a commit to branch feat/runtimer-db-observer
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git

commit f7c289f39c325d1865d05e02f41c72a39bd84853
Author: 2011shenlin <20...@gmail.com>
AuthorDate: Sun May 7 16:19:54 2023 +0800

    feat:support runtimer db observer
---
 adapter/persistence/pom.xml                        |   4 +
 .../MybatisEventTargetRunnerRepository.java        |  12 ++
 .../resources/mybatis/EventTargetRunnerMapper.xml  |  14 +-
 adapter/rpc/pom.xml                                |   4 +
 .../RocketMQConnectTargetRunnerAPIImpl.java        | 141 ---------------------
 .../rpc/impl/connect/RocketMQConverter.java        |  12 +-
 .../impl/runtime/RuntimeTargetRunnerAPIImpl.java   |  92 ++++++++++++++
 adapter/runtimer/pom.xml                           |   4 +
 .../service/TargetRunnerConfigOnDBObserver.java    |  41 +++++-
 .../domain/model/run/EventTargetRunnerService.java |  37 +++++-
 .../domain/model/target/EventTargetService.java    |   3 +-
 .../repository/EventTargetRunnerRepository.java    |   3 +
 12 files changed, 205 insertions(+), 162 deletions(-)

diff --git a/adapter/persistence/pom.xml b/adapter/persistence/pom.xml
index e3e0047..fdbb7a3 100644
--- a/adapter/persistence/pom.xml
+++ b/adapter/persistence/pom.xml
@@ -31,6 +31,10 @@
             <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-eventbridge-domain</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-eventbridge-adapter-runtimer</artifactId>
+        </dependency>
         <!-- Framework -->
         <dependency>
             <groupId>org.springframework</groupId>
diff --git a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java
index d5e647f..4edeecd 100644
--- a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java
+++ b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java
@@ -16,8 +16,11 @@
  */
 package org.apache.rocketmq.eventbridge.adapter.persistence.target.mybatis.repository;
 
+import com.google.common.collect.Lists;
 import com.google.gson.Gson;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.rocketmq.eventbridge.adapter.persistence.target.mybatis.converter.EventTargetRunnerConverter;
 import org.apache.rocketmq.eventbridge.adapter.persistence.target.mybatis.dataobject.EventTargetRunnerDO;
 import org.apache.rocketmq.eventbridge.adapter.persistence.target.mybatis.mapper.EventTargetRunnerMapper;
@@ -67,4 +70,13 @@ public class MybatisEventTargetRunnerRepository implements EventTargetRunnerRepo
             == 1;
     }
 
+    @Override
+    public List<EventTargetRunner> listEventTargetRunners(String accountId, String eventBusName, String eventRuleName) {
+        List<EventTargetRunnerDO> eventTargetRunnerDOS = eventTargetRunnerMapper.listEventTargetRunners(accountId, eventBusName, eventRuleName);
+        if (eventTargetRunnerDOS == null || eventTargetRunnerDOS.isEmpty()) {
+            return Lists.newArrayListWithCapacity(0);
+        }
+        return eventTargetRunnerDOS.stream().map(eventTargetRunnerDO -> EventTargetRunnerConverter.convert(eventTargetRunnerDO)).collect(Collectors.toList());
+    }
+
 }
\ No newline at end of file
diff --git a/adapter/persistence/src/main/resources/mybatis/EventTargetRunnerMapper.xml b/adapter/persistence/src/main/resources/mybatis/EventTargetRunnerMapper.xml
index fe1ca91..42591d8 100644
--- a/adapter/persistence/src/main/resources/mybatis/EventTargetRunnerMapper.xml
+++ b/adapter/persistence/src/main/resources/mybatis/EventTargetRunnerMapper.xml
@@ -57,9 +57,17 @@
         <include refid="COLUMN_LIST"/>
         FROM
         <include refid="TABLE_NAME"/>
-        WHERE account_id = #{accountId}
-        AND bus = #{eventBusName}
-        AND rule = #{eventRuleName}
+        WHERE
+        id>=0
+        <if test="accountId != null">
+            AND account_id = #{accountId}
+        </if>
+        <if test="eventBusName != null">
+            AND bus = #{eventBusName}
+        </if>
+        <if test="eventRuleName != null">
+            AND rule = #{eventRuleName}
+        </if>
     </select>
 
     <delete id="deleteEventTargetRunner">
diff --git a/adapter/rpc/pom.xml b/adapter/rpc/pom.xml
index a1e981e..7aaf521 100644
--- a/adapter/rpc/pom.xml
+++ b/adapter/rpc/pom.xml
@@ -28,6 +28,10 @@
     <version>1.0.0</version>
     <dependencies>
         <!-- Project Modules -->
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-eventbridge-adapter-runtimer</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-eventbridge-adapter-persistence</artifactId>
diff --git a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/RocketMQConnectTargetRunnerAPIImpl.java b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/RocketMQConnectTargetRunnerAPIImpl.java
deleted file mode 100644
index 96cc45d..0000000
--- a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/RocketMQConnectTargetRunnerAPIImpl.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect;
-
-import com.alibaba.fastjson.JSON;
-import com.google.common.collect.Lists;
-import com.google.gson.Gson;
-import java.util.ArrayList;
-import java.util.Map;
-import lombok.SneakyThrows;
-import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.context.RocketMQConnectSourceRunnerContext;
-import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.context.RocketMQConnectTargetRunnerContext;
-import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.dto.ActionStatusResponseEnum;
-import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.dto.CreateSinkConnectorRequest;
-import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.dto.TransformRequest;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
-import org.apache.rocketmq.eventbridge.domain.common.enums.EventTargetStatusEnum;
-import org.apache.rocketmq.eventbridge.domain.model.Component;
-import org.apache.rocketmq.eventbridge.domain.model.run.RunOptions;
-import org.apache.rocketmq.eventbridge.domain.storage.EventDataRepository;
-import org.apache.rocketmq.eventbridge.domain.rpc.TargetRunnerAPI;
-import org.apache.rocketmq.eventbridge.tools.transform.TransformParam;
-
-@org.springframework.stereotype.Component
-public class RocketMQConnectTargetRunnerAPIImpl extends RocketMQConverter implements TargetRunnerAPI {
-
-    private final RocketMQConnectClient rocketMQConnectClient;
-
-    public RocketMQConnectTargetRunnerAPIImpl(EventDataRepository eventDataRepository,
-        RocketMQConnectClient rocketMQConnectClient) {
-        super(eventDataRepository);
-        this.rocketMQConnectClient = rocketMQConnectClient;
-    }
-
-    @SneakyThrows
-    @Override
-    public String createAndStartEventTargetRunner(String accountId, String name, Component source, Component target,
-        String filterPattern, Map<String, TransformParam> targetTransform, RunOptions runOptions) {
-        String topicName = this.parseTopicName(source);
-        String sinkConnectorClass = this.parseConnectorClass(target);
-        Map<String, Object> sinkConnectorConfig = this.parseConnectorConfig(target);
-        TransformRequest filterTransform = this.buildEventBridgeFilterTransform(filterPattern);
-        TransformRequest eventBridgeTransform = this.buildEventBridgeTransform(targetTransform);
-        TargetKeyValue targetKeyValue = initSinkTaskConfig(name, topicName, sinkConnectorClass,
-            sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
-        rocketMQConnectClient.createSinkConnector(name, topicName, sinkConnectorClass,
-            sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
-        RocketMQConnectTargetRunnerContext context = new RocketMQConnectTargetRunnerContext(name, JSON.toJSONString(targetKeyValue));
-        return new Gson().toJson(context);
-    }
-
-    @Override
-    @SneakyThrows
-    public String updateEventTargetRunner(String accountId, String name, Component source, Component target,
-        String filterPattern, Map<String, TransformParam> targetTransform, RunOptions runOptions, String runContext) {
-        String topicName = this.parseTopicName(source);
-        String sinkConnectorClass = this.parseConnectorClass(target);
-        Map<String, Object> sinkConnectorConfig = this.parseConnectorConfig(target);
-        TransformRequest filterTransform = this.buildEventBridgeFilterTransform(filterPattern);
-        TransformRequest eventBridgeTransform = this.buildEventBridgeTransform(targetTransform);
-        //create
-        TargetKeyValue targetKeyValue = initSinkTaskConfig(name, topicName, sinkConnectorClass,
-            sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
-        //stop
-        this.delete(runContext);
-        rocketMQConnectClient.createSinkConnector(name, topicName, sinkConnectorClass,
-            sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
-        RocketMQConnectTargetRunnerContext context = new RocketMQConnectTargetRunnerContext(name, JSON.toJSONString(targetKeyValue));
-        return new Gson().toJson(context);
-    }
-
-    @Override
-    public EventTargetStatusEnum getEventTargetRunnerStatus(String runContext) {
-        RocketMQConnectSourceRunnerContext context = new Gson().fromJson(runContext,
-            RocketMQConnectSourceRunnerContext.class);
-        String status = rocketMQConnectClient.getConnectorStatus(context.getConnectorName());
-        return ActionStatusResponseEnum.parseEventTargetStatusEnum(status);
-    }
-
-    @Override
-    public boolean delete(String runContext) {
-        RocketMQConnectSourceRunnerContext context = new Gson().fromJson(runContext,
-            RocketMQConnectSourceRunnerContext.class);
-        return rocketMQConnectClient.stop(context.getConnectorName());
-    }
-
-    @Override
-    public boolean pause(String runContext) {
-        RocketMQConnectSourceRunnerContext context = new Gson().fromJson(runContext,
-            RocketMQConnectSourceRunnerContext.class);
-        return rocketMQConnectClient.stop(context.getConnectorName());
-    }
-
-    @Override
-    public boolean start(String runContext) {
-        RocketMQConnectSourceRunnerContext context = new Gson().fromJson(runContext,
-            RocketMQConnectSourceRunnerContext.class);
-        return rocketMQConnectClient.start(context.getConnectorName());
-    }
-
-    /**
-     * init sink task config
-     *
-     * @param name
-     * @param topicName
-     * @param sinkClass
-     * @param sinkConfig
-     * @param transforms
-     * @return
-     */
-    private TargetKeyValue initSinkTaskConfig(String name, String topicName, String sinkClass,
-        Map<String, Object> sinkConfig, ArrayList<TransformRequest> transforms) {
-        CreateSinkConnectorRequest request = new CreateSinkConnectorRequest();
-        request.setName(name);
-        request.setTopicName(topicName);
-        request.setConnectorClass(sinkClass);
-        request.setConnectorConfig(sinkConfig);
-        request.setTransforms(transforms);
-        Map<String, Object> sinkTaskMap = request.getRequestObject();
-        TargetKeyValue targetKeyValue = new TargetKeyValue();
-        for (String key : sinkTaskMap.keySet()) {
-            targetKeyValue.put(key, sinkTaskMap.get(key).toString());
-        }
-        return targetKeyValue;
-    }
-}
diff --git a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/RocketMQConverter.java b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/RocketMQConverter.java
index d1f5708..b828830 100644
--- a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/RocketMQConverter.java
+++ b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/RocketMQConverter.java
@@ -55,7 +55,7 @@ public class RocketMQConverter {
         this.eventDataRepository = eventDataRepository;
     }
 
-    protected TransformRequest buildEventBridgeTransform(Map<String, TransformParam> targetTransform) {
+    public static TransformRequest buildEventBridgeTransform(Map<String, TransformParam> targetTransform) {
         Map<String, Object> config = Maps.newHashMap();
         targetTransform.entrySet()
             .forEach(entry -> {
@@ -65,28 +65,28 @@ public class RocketMQConverter {
         return new TransformRequest(EB_TRANSFORM_NAME, config);
     }
 
-    protected TransformRequest buildEventBridgeFilterTransform(String filterPattern) {
+    public static TransformRequest buildEventBridgeFilterTransform(String filterPattern) {
         Map<String, Object> config = Maps.newHashMap();
         config.put(FILTER_TRANSFORM_NAME_KEY, filterPattern);
         config.put(KEY_CLASS, FILTER_TRANSFORM_CLASS);
         return new TransformRequest(FILTER_TRANSFORM_NAME, config);
     }
 
-    protected TransformRequest buildCloudEventTransform(Map<String, Object> transformPattern) {
+    public static TransformRequest buildCloudEventTransform(Map<String, Object> transformPattern) {
         transformPattern.put(KEY_CLASS, EB_CLOUD_EVENTTRANSFORM_CLASS);
         return new TransformRequest(CLOUD_EVENT_TRANSFORM_NAME, transformPattern);
     }
 
-    protected Map<String, Object> parseConnectorConfig(Component component) {
+    public Map<String, Object> parseConnectorConfig(Component component) {
         return component.getConfig();
     }
 
-    protected String parseConnectorClass(Component component) {
+    public String parseConnectorClass(Component component) {
         return (String) component.getConfig()
             .get(KEY_CLASS);
     }
 
-    protected String parseTopicName(Component source) {
+    public String parseTopicName(Component source) {
         String eventBusName = (String) (source.getConfig()
             .get(EVENT_BUS_NAME_KEY));
         String accountId = (String) (source.getConfig()
diff --git a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/runtime/RuntimeTargetRunnerAPIImpl.java b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/runtime/RuntimeTargetRunnerAPIImpl.java
new file mode 100644
index 0000000..fed9b4b
--- /dev/null
+++ b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/runtime/RuntimeTargetRunnerAPIImpl.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.rpc.impl.runtime;
+
+import com.google.common.collect.Lists;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+import java.util.List;
+import java.util.Map;
+import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.RocketMQConverter;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+import org.apache.rocketmq.eventbridge.domain.common.enums.EventTargetStatusEnum;
+import org.apache.rocketmq.eventbridge.domain.model.Component;
+import org.apache.rocketmq.eventbridge.domain.model.run.RunOptions;
+import org.apache.rocketmq.eventbridge.domain.rpc.TargetRunnerAPI;
+import org.apache.rocketmq.eventbridge.tools.transform.TransformParam;
+
+@org.springframework.stereotype.Component
+public class RuntimeTargetRunnerAPIImpl implements TargetRunnerAPI {
+
+    @Override
+    public String createAndStartEventTargetRunner(String accountId, String name, Component source, Component target,
+        String filterPattern, Map<String, TransformParam> targetTransform, RunOptions runOptions) {
+        return new Gson().toJson(this.buildTargetRunnerConfig(accountId, name, source, target, filterPattern, targetTransform, runOptions));
+    }
+
+    @Override
+    public String updateEventTargetRunner(String accountId, String name, Component source, Component target,
+        String filterPattern, Map<String, TransformParam> targetTransform, RunOptions runOptions, String runContext) {
+        return new Gson().toJson(this.buildTargetRunnerConfig(accountId, name, source, target, filterPattern, targetTransform, runOptions));
+    }
+
+    private TargetRunnerConfig buildTargetRunnerConfig(String accountId, String name, Component source,
+        Component target,
+        String filterPattern, Map<String, TransformParam> targetTransform, RunOptions runOptions) {
+        TargetRunnerConfig targetRunnerConfig = new TargetRunnerConfig();
+        targetRunnerConfig.setName(name);
+        List<Map<String, String>> components = Lists.newArrayList();
+        targetRunnerConfig.setComponents(components);
+        Map<String, String> sourceComponent = new Gson().fromJson(new Gson().toJson(source
+            .getConfig()), new TypeToken<Map<String, String>>() {
+        }.getType());
+        Map<String, String> filterComponent = new Gson().fromJson(new Gson().toJson(RocketMQConverter.buildEventBridgeFilterTransform(filterPattern)
+            .getConfig()), new TypeToken<Map<String, String>>() {
+        }.getType());
+
+        Map<String, String> transformComponent = new Gson().fromJson(new Gson().toJson(RocketMQConverter.buildEventBridgeTransform(targetTransform)
+            .getConfig()), new TypeToken<Map<String, String>>() {
+        }.getType());
+        Map<String, String> targetComponent = new Gson().fromJson(new Gson().toJson(target
+            .getConfig()), new TypeToken<Map<String, String>>() {
+        }.getType());
+        components.add(sourceComponent);
+        components.add(filterComponent);
+        components.add(transformComponent);
+        components.add(targetComponent);
+        return targetRunnerConfig;
+    }
+
+    @Override public EventTargetStatusEnum getEventTargetRunnerStatus(String runContext) {
+        return null;
+    }
+
+    @Override public boolean delete(String runContext) {
+        //do nothing
+        return true;
+    }
+
+    @Override public boolean pause(String runContext) {
+        return false;
+    }
+
+    @Override public boolean start(String runContext) {
+        return false;
+    }
+
+}
diff --git a/adapter/runtimer/pom.xml b/adapter/runtimer/pom.xml
index 158f76c..ab8a7bc 100644
--- a/adapter/runtimer/pom.xml
+++ b/adapter/runtimer/pom.xml
@@ -31,6 +31,10 @@
     </properties>
 
     <dependencies>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-eventbridge-domain</artifactId>
+        </dependency>
         <dependency>
             <groupId>io.openmessaging</groupId>
             <artifactId>openmessaging-api</artifactId>
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnDBObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnDBObserver.java
index 1841e3c..5e74f56 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnDBObserver.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnDBObserver.java
@@ -17,30 +17,63 @@
 
 package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
 
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.gson.Gson;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.PostConstruct;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+import org.apache.rocketmq.eventbridge.domain.model.run.EventTargetRunner;
+import org.apache.rocketmq.eventbridge.domain.repository.EventTargetRepository;
+import org.apache.rocketmq.eventbridge.domain.repository.EventTargetRunnerRepository;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
+
+import static org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine.TARGET_RUNNER_KEY;
 
 @Slf4j
+@Component
 public class TargetRunnerConfigOnDBObserver extends AbstractTargetRunnerConfigObserver {
 
     private static ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(
         ThreadUtils.newThreadFactory("TargetRunnerConfigOnDBObserver", false));
 
+    @Autowired
+    EventTargetRunnerRepository eventTargetRunnerRepository;
+
+    @Autowired
+    EventTargetRepository eventTargetRepository;
+
     public TargetRunnerConfigOnDBObserver() {
     }
 
     @Override
+    @Transactional
     public Set<TargetRunnerConfig> getLatestTargetRunnerConfig() {
-        return null;
+        List<EventTargetRunner> eventTargetRunners = eventTargetRunnerRepository.listEventTargetRunners(null, null, null);
+        Set<TargetRunnerConfig> targetRunnerConfigs = Sets.newHashSet();
+        for (EventTargetRunner eventTargetRunner : eventTargetRunners) {
+            targetRunnerConfigs.add(new Gson().fromJson(eventTargetRunner.getRunContext(), TargetRunnerConfig.class));
+        }
+        return targetRunnerConfigs;
+    }
+
+    private Map<String, String> buildEventBusComponent(String eventBusName) {
+        Map<String, String> component = Maps.newHashMap();
+        component.put(TARGET_RUNNER_KEY, eventBusName);
+        return component;
     }
 
-    public void addListen(
-        TargetRunnerConfigOnDBObserver pusherConfigOnFileService) {
+    @PostConstruct
+    public void addListen() {
         service.scheduleAtFixedRate(() -> {
             try {
                 Set<TargetRunnerConfig> latest = this.getLatestTargetRunnerConfig();
@@ -52,7 +85,7 @@ public class TargetRunnerConfigOnDBObserver extends AbstractTargetRunnerConfigOb
             } catch (Throwable e) {
                 log.error("Watch failed.", e);
             }
-        }, 0, 3, TimeUnit.SECONDS);
+        }, 0, 30, TimeUnit.SECONDS);
     }
 
 }
\ No newline at end of file
diff --git a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/run/EventTargetRunnerService.java b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/run/EventTargetRunnerService.java
index 857ad1b..5a61f3b 100644
--- a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/run/EventTargetRunnerService.java
+++ b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/run/EventTargetRunnerService.java
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 package org.apache.rocketmq.eventbridge.domain.model.run;
+
 import com.google.common.base.Strings;
 import com.google.common.collect.Maps;
 import java.util.Map;
@@ -29,21 +30,25 @@ import org.apache.rocketmq.eventbridge.exception.EventBridgeException;
 import org.apache.rocketmq.eventbridge.tools.transform.TransformParam;
 import org.springframework.dao.DuplicateKeyException;
 import org.springframework.stereotype.Service;
+
 import static org.apache.rocketmq.eventbridge.config.EventBridgeConstants.ACCOUNT_ID_KEY;
 import static org.apache.rocketmq.eventbridge.config.EventBridgeConstants.EVENT_BUS_NAME_KEY;
 import static org.apache.rocketmq.eventbridge.domain.common.exception.EventBridgeErrorCode.EventTargetAlreadyExist;
 import static org.apache.rocketmq.eventbridge.domain.common.exception.EventBridgeErrorCode.EventTargetNotExist;
+
 @Service
 public class EventTargetRunnerService extends AbstractRunnerService {
     private final EventTargetRunnerRepository eventTargetRunnerRepository;
     private final TargetRunnerAPI targetRunnerAPI;
     private final EventTargetClassService eventTargetClassService;
+
     public EventTargetRunnerService(EventTargetRunnerRepository eventTargetRunnerRepository,
         TargetRunnerAPI targetRunnerAPI, EventTargetClassService eventTargetClassService) {
         this.eventTargetRunnerRepository = eventTargetRunnerRepository;
         this.targetRunnerAPI = targetRunnerAPI;
         this.eventTargetClassService = eventTargetClassService;
     }
+
     public boolean createEventTargetRunner(String accountId, String eventBusName, String eventRuleName,
         EventTarget eventTarget, String filterPattern) {
         try {
@@ -64,6 +69,7 @@ public class EventTargetRunnerService extends AbstractRunnerService {
         }
         return true;
     }
+
     public boolean deleteTargetRunner(String accountId, String eventBusName, String eventRuleName,
         String eventTargetName) {
         if (Strings.isNullOrEmpty(eventTargetName)) {
@@ -76,6 +82,7 @@ public class EventTargetRunnerService extends AbstractRunnerService {
         eventTargetRunnerRepository.deleteEventTargetRunner(accountId, eventBusName, eventRuleName, eventTargetName);
         return true;
     }
+
     public boolean updateTargetRunner(String accountId, String eventBusName, String eventRuleName,
         EventTarget eventTarget, String filterPattern) {
         this.checkExist(accountId, eventBusName, eventRuleName, eventTarget.getName());
@@ -93,6 +100,7 @@ public class EventTargetRunnerService extends AbstractRunnerService {
             eventTarget.getConfig(), eventTarget.getRunOptions(), runContext);
         return true;
     }
+
     public EventTargetRunner getEventTargetRunner(String accountId, String eventBusName, String eventRuleName,
         String eventTargetName) {
         EventTargetRunner eventTargetRunner = eventTargetRunnerRepository.getEventTargetRunner(accountId, eventBusName,
@@ -101,25 +109,40 @@ public class EventTargetRunnerService extends AbstractRunnerService {
         eventTargetRunner.setStatus(status);
         return eventTargetRunner;
     }
+
     public boolean pause(String accountId, String eventBusName, String eventRuleName,
         String eventTargetName) {
-        EventTargetRunner eventTargetRunner = eventTargetRunnerRepository.getEventTargetRunner(accountId, eventBusName,
-            eventRuleName, eventTargetName);
-        targetRunnerAPI.pause(eventTargetRunner.getRunContext());
-        return true;
+        return eventTargetRunnerRepository.deleteEventTargetRunner(accountId, eventBusName, eventRuleName,
+            eventTargetName);
     }
-    public boolean start(String accountId, String eventBusName, String eventRuleName, String eventTargetName) {
+
+    public boolean start(String accountId, String eventBusName, String eventRuleName, EventTarget eventTarget,
+        String filterPattern) {
         EventTargetRunner eventTargetRunner = eventTargetRunnerRepository.getEventTargetRunner(accountId, eventBusName,
-            eventRuleName, eventTargetName);
-        targetRunnerAPI.start(eventTargetRunner.getRunContext());
+            eventRuleName, eventTarget.getName());
+        if (eventTargetRunner == null || Strings.isNullOrEmpty(eventTargetRunner.getRunContext())) {
+            Component source = this.buildDefaultSourceComponent(accountId, eventBusName);
+            Component target = eventTargetClassService.renderConfig(accountId, eventTarget.getClassName(),
+                eventTarget.getConfig());
+            Map<String, TransformParam> targetTransform = eventTargetClassService.renderTargetTransform(accountId,
+                eventTarget.getClassName(), eventTarget.getConfig());
+            String runContext = targetRunnerAPI.createAndStartEventTargetRunner(accountId,
+                this.buildRunnerName(accountId, eventBusName, eventRuleName, eventTarget.getName()), source, target,
+                filterPattern, targetTransform, eventTarget.getRunOptions());
+            eventTargetRunnerRepository.createTargetRunner(accountId, eventBusName, eventRuleName,
+                eventTarget.getName(), eventTarget.getClassName(), eventTarget.getConfig(),
+                eventTarget.getRunOptions(), runContext);
+        }
         return true;
     }
+
     private Component buildDefaultSourceComponent(String accountId, String eventBusName) {
         Map<String, Object> config = Maps.newHashMap();
         config.put(EVENT_BUS_NAME_KEY, eventBusName);
         config.put(ACCOUNT_ID_KEY, accountId);
         return new Component(this.buildDefaultComponentName(), config);
     }
+
     public void checkExist(String accountId, String eventBusName, String eventRuleName, String eventTargetName) {
         EventTargetRunner eventTargetRunner = this.eventTargetRunnerRepository.getEventTargetRunner(accountId,
             eventBusName, eventRuleName, eventTargetName);
diff --git a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/target/EventTargetService.java b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/target/EventTargetService.java
index d3ac0ba..eda4481 100644
--- a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/target/EventTargetService.java
+++ b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/target/EventTargetService.java
@@ -160,8 +160,9 @@ public class EventTargetService extends AbstractRunnerService {
     public boolean enableTargets(String accountId, String eventBusName, String eventRuleName) {
         List<EventTarget> eventTargetList = eventTargetRepository.listEventTargets(accountId, eventBusName,
             eventRuleName);
+        EventRule eventRule = eventRuleService.getEventRule(accountId, eventBusName, eventRuleName);
         eventTargetList.forEach(eventTarget -> {
-            eventTargetRunnerService.start(accountId, eventBusName, eventRuleName, eventTarget.getName());
+            eventTargetRunnerService.start(accountId, eventBusName, eventRuleName, eventTarget, eventRule.getFilterPattern());
         });
         return true;
     }
diff --git a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/repository/EventTargetRunnerRepository.java b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/repository/EventTargetRunnerRepository.java
index 03a8265..45216b6 100644
--- a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/repository/EventTargetRunnerRepository.java
+++ b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/repository/EventTargetRunnerRepository.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.eventbridge.domain.repository;
 
+import java.util.List;
 import java.util.Map;
 import org.apache.rocketmq.eventbridge.domain.model.run.EventTargetRunner;
 import org.apache.rocketmq.eventbridge.domain.model.run.RunOptions;
@@ -35,4 +36,6 @@ public interface EventTargetRunnerRepository {
     boolean deleteEventTargetRunner(String accountId, String eventBusName, String eventRuleName,
         String eventTargetName);
 
+    List<EventTargetRunner> listEventTargetRunners(String accountId, String eventBusName, String eventRuleName);
+
 }