You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by sh...@apache.org on 2023/03/27 08:04:37 UTC

[rocketmq-eventbridge] branch runtimer updated: feat:add watch runner config. (#63)

This is an automated email from the ASF dual-hosted git repository.

shenlin pushed a commit to branch runtimer
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git


The following commit(s) were added to refs/heads/runtimer by this push:
     new 8ab725f  feat:add watch runner config. (#63)
8ab725f is described below

commit 8ab725f7d627e71f219e709af0af437de5660c03
Author: shenlin <20...@gmail.com>
AuthorDate: Mon Mar 27 16:04:30 2023 +0800

    feat:add watch runner config. (#63)
    
    feat:add watch runner config.
---
 .licenserc.yaml                                    |   1 +
 adapter/runtimer/pom.xml                           |  49 +++++++-
 .../runtimer/common/entity/TargetRunnerConfig.java |  49 +++++++-
 .../service/TargetRunnerConfigOnFileObserver.java  |  85 ++++++++++---
 .../src/main/resources/target-runner.config        |   7 --
 .../runtimer/src/main/resources/target-runner.json |  23 ++++
 .../TargetRunnerConfigOnFileObserverTest.java      | 103 ++++++++++++++++
 .../runtimer/service/TargetRunnerConfigUtil.java   | 132 +++++++++++++++++++++
 .../runtimer/service/TestConstants.java            |  23 ++++
 9 files changed, 438 insertions(+), 34 deletions(-)

diff --git a/.licenserc.yaml b/.licenserc.yaml
index f01ce66..57fa815 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -24,6 +24,7 @@ header:
   paths-ignore:
     - 'dist'
     - 'config'
+    - '**/*.json'
     - 'licenses'
     - '**/*.md'
     - 'LICENSE'
diff --git a/adapter/runtimer/pom.xml b/adapter/runtimer/pom.xml
index 9296b42..49578b4 100644
--- a/adapter/runtimer/pom.xml
+++ b/adapter/runtimer/pom.xml
@@ -22,6 +22,14 @@
 
     <artifactId>rocketmq-eventbridge-adapter-runtimer</artifactId>
     <version>1.0.0</version>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <commons.io.version>2.8.0</commons.io.version>
+        <junit.version>4.10</junit.version>
+    </properties>
+
     <dependencies>
         <dependency>
             <groupId>io.openmessaging</groupId>
@@ -59,11 +67,44 @@
             <groupId>org.projectlombok</groupId>
             <artifactId>lombok</artifactId>
         </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <version>${commons.io.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-eventbridge-common</artifactId>
+            <version>1.0.0</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>${junit.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter</artifactId>
+            <version>RELEASE</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter</artifactId>
+            <version>RELEASE</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <version>4.0.3</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
-    <properties>
-        <maven.compiler.source>8</maven.compiler.source>
-        <maven.compiler.target>8</maven.compiler.target>
-    </properties>
 
 </project>
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
index 45bc5f1..a2eb405 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
@@ -18,6 +18,7 @@
 package org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity;
 
 import java.io.Serializable;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import lombok.Data;
@@ -33,7 +34,7 @@ public class TargetRunnerConfig implements Serializable {
     /**
      * All data are reserved in this map.
      */
-    private Map<String, String> properties;
+    private List<Map<String, String>> components;
 
     @Override
     public boolean equals(Object o) {
@@ -42,12 +43,12 @@ public class TargetRunnerConfig implements Serializable {
         if (o == null || getClass() != o.getClass())
             return false;
         TargetRunnerConfig config = (TargetRunnerConfig) o;
-        return Objects.equals(name, config.name) && Objects.equals(properties, config.properties);
+        return Objects.equals(name, config.name) && isEqualsComponents(components, config.getComponents());
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(name, properties);
+        return Objects.hash(name, components);
     }
 
     @Override
@@ -55,7 +56,47 @@ public class TargetRunnerConfig implements Serializable {
         //TODO
         return "TargetRunnerConfig{" +
             "connectName='" + name + '\'' +
-            ", properties=" + properties +
+            ", properties=" + components +
             '}';
     }
+
+    private boolean isEqualsComponents(List<Map<String, String>> source, List<Map<String, String>> target) {
+        if (source == null || target == null) {
+            if (source != target) {
+                return false;
+            } else {
+                return true;
+            }
+        }
+
+        if (source.isEmpty() || target.isEmpty()) {
+            if (source.isEmpty() && target.isEmpty()) {
+                return true;
+            } else {
+                return false;
+            }
+        }
+
+        if (source.size() != target.size()) {
+            return false;
+        }
+        for (int index = 0; index < source.size(); index++) {
+            Map<String, String> sourceComponent = source.get(index);
+            Map<String, String> targetComponent = target.get(index);
+            if (sourceComponent.size() != targetComponent.size()) {
+                return false;
+            }
+            for (Map.Entry<String, String> entry : sourceComponent.entrySet()) {
+                String element = targetComponent.get(entry.getKey());
+                if (element == null && entry.getValue() == null) {
+                    return true;
+                } else if (element.equals(entry.getValue())) {
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
 }
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
index 27269b8..b963c72 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
@@ -17,51 +17,78 @@
 
 package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
 
+import com.google.common.collect.Maps;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Type;
 import java.nio.file.FileSystems;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.StandardWatchEventKinds;
 import java.nio.file.WatchKey;
 import java.nio.file.WatchService;
+import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.FileUtils;
 import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
-import org.springframework.beans.factory.annotation.Value;
+import org.apache.rocketmq.eventbridge.exception.EventBridgeException;
 
 @Slf4j
 public class TargetRunnerConfigOnFileObserver extends AbstractTargetRunnerConfigObserver {
 
-    @Value("${runtimer.storePathRootDir:}")
-    private String storeRootPath;
-
-    @Value("${runtimer.store.targetRunner.config:targetRunner-config}")
-    private String fileName;
+    private String pathName;
 
     private static ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(
         ThreadUtils.newThreadFactory("TargetRunnerConfigOnFileObserver", false));
 
-    public TargetRunnerConfigOnFileObserver(Plugin plugin) {
-        this.addListen(storeRootPath, fileName, this);
+    public TargetRunnerConfigOnFileObserver(String pathName) {
+        this.pathName = pathName;
+        super.getTargetRunnerConfig().addAll(getLatestTargetRunnerConfig());
+        this.addListen(pathName, this);
+    }
+
+    public TargetRunnerConfigOnFileObserver() {
+        super.getTargetRunnerConfig().addAll(getLatestTargetRunnerConfig());
+        this.addListen(pathName, this);
     }
 
     @Override
     public Set<TargetRunnerConfig> getLatestTargetRunnerConfig() {
-        return null;
+        String config = null;
+        try {
+            File file = new File(pathName);
+            config = FileUtils.readFileToString(file, "UTF-8");
+            Type workerConfigType = new TypeToken<HashSet<TargetRunnerConfig>>() {
+            }.getType();
+            Set<TargetRunnerConfig> taskConfigList = new Gson().fromJson(config, workerConfigType);
+            return taskConfigList;
+        } catch (IOException e) {
+            throw new EventBridgeException("Load component properties failed.", e);
+        } catch (Throwable e) {
+            log.error("fail to parse config={} from file={}", config, pathName);
+            throw e;
+        }
     }
 
-    public void addListen(String pathStr, String fileName,
+    public void addListen(String pathName,
         TargetRunnerConfigOnFileObserver pusherConfigOnFileService) {
-        log.info("Watching task file changing:{}", pathStr + fileName);
+        log.info("Watching task file changing:{}", pathName);
+        int index = pathName.lastIndexOf("/");
+        String filePath = pathName.substring(0, index);
+        String fileName = pathName.substring(index + 1);
         service.scheduleAtFixedRate(() -> {
             try {
                 WatchService watchService = FileSystems.getDefault()
                     .newWatchService();
-                Path path = Paths.get(pathStr);
+                Path path = Paths.get(filePath);
                 path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE,
                     StandardWatchEventKinds.ENTRY_MODIFY);
                 WatchKey watchKey;
@@ -81,12 +108,32 @@ public class TargetRunnerConfigOnFileObserver extends AbstractTargetRunnerConfig
     }
 
     public void diff() {
-        Set<TargetRunnerConfig> latest = this.getLatestTargetRunnerConfig();
-        Set<TargetRunnerConfig> last = super.getTargetRunnerConfig();
-        TargetRunnerConfig changed = null;
-        super.onAddTargetRunner(changed);
-        super.onUpdateTargetRunner(changed);
-        super.onDeleteTargetRunner(changed);
+        Map<String, TargetRunnerConfig> lastMap = toMap(super.getTargetRunnerConfig());
+        Map<String, TargetRunnerConfig> latestMap = toMap(this.getLatestTargetRunnerConfig());
+        lastMap.entrySet().forEach(entry -> {
+            TargetRunnerConfig latest = latestMap.get(entry.getKey());
+            if (latest == null) {
+                super.onDeleteTargetRunner(entry.getValue());
+            } else if (!latest.equals(entry.getValue())) {
+                super.onUpdateTargetRunner(entry.getValue());
+            }
+        });
+
+        latestMap.entrySet().forEach(entry -> {
+            TargetRunnerConfig latest = lastMap.get(entry.getKey());
+            if (latest == null) {
+                super.onAddTargetRunner(entry.getValue());
+            }
+        });
+    }
+
+    private Map<String, TargetRunnerConfig> toMap(Set<TargetRunnerConfig> targetRunnerConfigs) {
+        if (targetRunnerConfigs == null || targetRunnerConfigs.isEmpty()) {
+            return Maps.newHashMapWithExpectedSize(0);
+        }
+        Map<String, TargetRunnerConfig> map = Maps.newHashMap();
+        targetRunnerConfigs.forEach(entry -> map.put(entry.getName(), entry));
+        return map;
     }
 
 }
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/resources/target-runner.config b/adapter/runtimer/src/main/resources/target-runner.config
deleted file mode 100644
index 04458c6..0000000
--- a/adapter/runtimer/src/main/resources/target-runner.config
+++ /dev/null
@@ -1,7 +0,0 @@
-[
-    {
-    },
-    {
-
-    }
-]
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/resources/target-runner.json b/adapter/runtimer/src/main/resources/target-runner.json
new file mode 100644
index 0000000..11b5088
--- /dev/null
+++ b/adapter/runtimer/src/main/resources/target-runner.json
@@ -0,0 +1,23 @@
+[
+    {
+
+        "name":"xxxxx",
+        "components":[ {
+          "class" : "org.apache.rocketmq.connect.FileStream",
+          "path" : "xxxxxxxx",
+          "name" : "demo"
+        },
+        {
+          "class" : "org.apache.rocketmq.connect.transforms.PatternRename",
+          "pattern" : "company",
+          "replacement": "company02"
+        },
+       {
+          "class" : "org.apache.rocketmq.connect.HttpSinkTask",
+          "url" : "http://xxxxx/demo"
+        } ],
+        "runOptions":{
+            "taskSize":1
+        }
+    }
+]
\ No newline at end of file
diff --git a/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/TargetRunnerConfigOnFileObserverTest.java b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/TargetRunnerConfigOnFileObserverTest.java
new file mode 100644
index 0000000..41f1989
--- /dev/null
+++ b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/TargetRunnerConfigOnFileObserverTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.runtimer.service;
+
+import java.time.Duration;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerListener;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigOnFileObserver;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.rocketmq.eventbridge.runtimer.service.TestConstants.DEMO_TARGET_RUNNER_CONFIG_FILE_NAME;
+import static org.awaitility.Awaitility.await;
+
+public class TargetRunnerConfigOnFileObserverTest {
+
+    @Test
+    public void testGetLatestTargetRunnerConfig() {
+        TargetRunnerConfigUtil.resetTargetRunner(getConfigFilePath());
+        TargetRunnerConfigOnFileObserver targetRunnerConfigOnFileObserver = new TargetRunnerConfigOnFileObserver(getConfigFilePath());
+        System.out.println(targetRunnerConfigOnFileObserver.getLatestTargetRunnerConfig());
+        Assert.assertTrue(!targetRunnerConfigOnFileObserver.getLatestTargetRunnerConfig().stream().findFirst().get().getComponents().isEmpty());
+    }
+
+    @Test
+    public void testListen_Add() throws InterruptedException {
+        String path = getConfigFilePath();
+        TestTargetRunnerListener targetRunnerListener = initTargetRunnerConfigOnFileObserver(path);
+        TargetRunnerConfigUtil.addTargetRunner(path);
+        await().atMost(Duration.ofSeconds(60)).until(() -> {
+            return targetRunnerListener.addTargetRunner && !targetRunnerListener.updateTargetRunner && !targetRunnerListener.deleteTargetRunner;
+        });
+    }
+
+    @Test
+    public void testListen_Delete() throws InterruptedException {
+        String path = getConfigFilePath();
+        TestTargetRunnerListener targetRunnerListener = initTargetRunnerConfigOnFileObserver(path);
+        TargetRunnerConfigUtil.deleteTargetRunner(path);
+        await().atMost(Duration.ofSeconds(60)).until(() -> {
+            return !targetRunnerListener.addTargetRunner && !targetRunnerListener.updateTargetRunner && targetRunnerListener.deleteTargetRunner;
+        });
+    }
+
+    @Test
+    public void testListen_Update() throws InterruptedException {
+        String path = getConfigFilePath();
+        TestTargetRunnerListener targetRunnerListener = initTargetRunnerConfigOnFileObserver(path);
+        TargetRunnerConfigUtil.updateTargetRunner(path);
+        await().atMost(Duration.ofSeconds(60)).until(() -> {
+            return !targetRunnerListener.addTargetRunner && targetRunnerListener.updateTargetRunner && !targetRunnerListener.deleteTargetRunner;
+        });
+    }
+
+    private  TestTargetRunnerListener initTargetRunnerConfigOnFileObserver(String path) throws InterruptedException {
+        TargetRunnerConfigUtil.resetTargetRunner(path);
+        TargetRunnerConfigOnFileObserver targetRunnerConfigOnFileObserver = new TargetRunnerConfigOnFileObserver(path);
+        TestTargetRunnerListener targetRunnerListener = new TestTargetRunnerListener();
+        targetRunnerConfigOnFileObserver.registerListener(targetRunnerListener);
+        Thread.sleep(3000L);
+        return targetRunnerListener;
+    }
+
+    private String getConfigFilePath() {
+        return this.getClass().getClassLoader().getResource(DEMO_TARGET_RUNNER_CONFIG_FILE_NAME).getPath();
+    }
+
+    class TestTargetRunnerListener implements TargetRunnerListener {
+        boolean addTargetRunner = false;
+        boolean updateTargetRunner = false;
+        boolean deleteTargetRunner = false;
+
+        @Override public void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+            System.out.println("watch target runner");
+            this.addTargetRunner = true;
+        }
+
+        @Override public void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+            System.out.println("watch update runner");
+            this.updateTargetRunner = true;
+        }
+
+        @Override public void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+            System.out.println("watch delete runner");
+            this.deleteTargetRunner = true;
+        }
+    }
+}
\ No newline at end of file
diff --git a/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/TargetRunnerConfigUtil.java b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/TargetRunnerConfigUtil.java
new file mode 100644
index 0000000..a837d52
--- /dev/null
+++ b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/TargetRunnerConfigUtil.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.runtimer.service;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.commons.io.FileUtils;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+import org.apache.rocketmq.eventbridge.exception.EventBridgeException;
+
+import static org.apache.rocketmq.eventbridge.runtimer.service.TestConstants.DEMO_TARGET_RUNNER_NAME;
+
+public class TargetRunnerConfigUtil {
+    /**
+     * add target runner
+     *
+     * @param path
+     */
+
+    public static void addTargetRunner(String path) {
+        Set<TargetRunnerConfig> latest = getLatest(path);
+        latest.add(buildNewTargetRunnerConfig());
+        persistenceToFile(path, latest);
+    }
+
+    /**
+     * update target runner
+     *
+     * @param path
+     */
+    public static void updateTargetRunner(String path) {
+        Set<TargetRunnerConfig> latest = getLatest(path);
+        latest.stream().findFirst().get().getComponents().get(0).put("newKey", "newValue");
+        persistenceToFile(path, latest);
+    }
+
+    /**
+     * update target runner
+     *
+     * @param path
+     */
+    public static void deleteTargetRunner(String path) {
+        persistenceToFile(path, Sets.newHashSet());
+    }
+
+    /**
+     * delete target runner
+     *
+     * @param path
+     */
+    public static void resetTargetRunner(String path) {
+        TargetRunnerConfig targetRunnerConfig = buildNewTargetRunnerConfig(DEMO_TARGET_RUNNER_NAME);
+        try {
+            BufferedWriter out = new BufferedWriter(new FileWriter(path));
+            out.write(new GsonBuilder().setPrettyPrinting().create().toJson(Lists.newArrayList(targetRunnerConfig)));
+            out.close();
+        } catch (IOException e) {
+            throw new EventBridgeException("Load component properties failed.", e);
+        } catch (Throwable e) {
+            throw e;
+        }
+    }
+
+    public static Set<TargetRunnerConfig> getLatest(String path) {
+        String config = null;
+        try {
+            File file = new File(path);
+            config = FileUtils.readFileToString(file, "UTF-8");
+        } catch (Throwable e) {
+            e.printStackTrace();
+        }
+        Type workerConfigType = new TypeToken<HashSet<TargetRunnerConfig>>() {
+        }.getType();
+        return new Gson().fromJson(config, workerConfigType);
+    }
+
+    private static void persistenceToFile(String path, Set<TargetRunnerConfig> taskConfigList) {
+        try {
+            BufferedWriter out = new BufferedWriter(new FileWriter(path));
+            out.write(new GsonBuilder().setPrettyPrinting().create().toJson(taskConfigList));
+            out.close();
+        } catch (IOException e) {
+            throw new EventBridgeException("Load component properties failed.", e);
+        } catch (Throwable e) {
+            throw e;
+        }
+    }
+
+    private static TargetRunnerConfig buildNewTargetRunnerConfig(String name) {
+        TargetRunnerConfig targetRunnerConfig = new TargetRunnerConfig();
+        targetRunnerConfig.setName(name);
+        List<Map<String, String>> components = Lists.newArrayList();
+        Map<String, String> component = Maps.newHashMap();
+        component.put("K1", UUID.randomUUID().toString());
+        components.add(component);
+        targetRunnerConfig.setComponents(components);
+        return targetRunnerConfig;
+    }
+
+    private static TargetRunnerConfig buildNewTargetRunnerConfig() {
+        return buildNewTargetRunnerConfig(UUID.randomUUID().toString());
+    }
+
+}
\ No newline at end of file
diff --git a/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/TestConstants.java b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/TestConstants.java
new file mode 100644
index 0000000..302d675
--- /dev/null
+++ b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/TestConstants.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.runtimer.service;
+
+public class TestConstants {
+    public static final String DEMO_TARGET_RUNNER_NAME = "demo";
+
+    public static final String DEMO_TARGET_RUNNER_CONFIG_FILE_NAME = "target-runner.json";
+}
\ No newline at end of file