You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by sh...@apache.org on 2023/03/27 08:04:37 UTC
[rocketmq-eventbridge] branch runtimer updated: feat:add watch runner config. (#63)
This is an automated email from the ASF dual-hosted git repository.
shenlin pushed a commit to branch runtimer
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git
The following commit(s) were added to refs/heads/runtimer by this push:
new 8ab725f feat:add watch runner config. (#63)
8ab725f is described below
commit 8ab725f7d627e71f219e709af0af437de5660c03
Author: shenlin <20...@gmail.com>
AuthorDate: Mon Mar 27 16:04:30 2023 +0800
feat:add watch runner config. (#63)
feat:add watch runner config.
---
.licenserc.yaml | 1 +
adapter/runtimer/pom.xml | 49 +++++++-
.../runtimer/common/entity/TargetRunnerConfig.java | 49 +++++++-
.../service/TargetRunnerConfigOnFileObserver.java | 85 ++++++++++---
.../src/main/resources/target-runner.config | 7 --
.../runtimer/src/main/resources/target-runner.json | 23 ++++
.../TargetRunnerConfigOnFileObserverTest.java | 103 ++++++++++++++++
.../runtimer/service/TargetRunnerConfigUtil.java | 132 +++++++++++++++++++++
.../runtimer/service/TestConstants.java | 23 ++++
9 files changed, 438 insertions(+), 34 deletions(-)
diff --git a/.licenserc.yaml b/.licenserc.yaml
index f01ce66..57fa815 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -24,6 +24,7 @@ header:
paths-ignore:
- 'dist'
- 'config'
+ - '**/*.json'
- 'licenses'
- '**/*.md'
- 'LICENSE'
diff --git a/adapter/runtimer/pom.xml b/adapter/runtimer/pom.xml
index 9296b42..49578b4 100644
--- a/adapter/runtimer/pom.xml
+++ b/adapter/runtimer/pom.xml
@@ -22,6 +22,14 @@
<artifactId>rocketmq-eventbridge-adapter-runtimer</artifactId>
<version>1.0.0</version>
+
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ <commons.io.version>2.8.0</commons.io.version>
+ <junit.version>4.10</junit.version>
+ </properties>
+
<dependencies>
<dependency>
<groupId>io.openmessaging</groupId>
@@ -59,11 +67,44 @@
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>${commons.io.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-eventbridge-common</artifactId>
+ <version>1.0.0</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter</artifactId>
+ <version>RELEASE</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter</artifactId>
+ <version>RELEASE</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <version>4.0.3</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
- <properties>
- <maven.compiler.source>8</maven.compiler.source>
- <maven.compiler.target>8</maven.compiler.target>
- </properties>
</project>
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
index 45bc5f1..a2eb405 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
@@ -18,6 +18,7 @@
package org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity;
import java.io.Serializable;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import lombok.Data;
@@ -33,7 +34,7 @@ public class TargetRunnerConfig implements Serializable {
/**
* All data are reserved in this map.
*/
- private Map<String, String> properties;
+ private List<Map<String, String>> components;
@Override
public boolean equals(Object o) {
@@ -42,12 +43,12 @@ public class TargetRunnerConfig implements Serializable {
if (o == null || getClass() != o.getClass())
return false;
TargetRunnerConfig config = (TargetRunnerConfig) o;
- return Objects.equals(name, config.name) && Objects.equals(properties, config.properties);
+ return Objects.equals(name, config.name) && isEqualsComponents(components, config.getComponents());
}
@Override
public int hashCode() {
- return Objects.hash(name, properties);
+ return Objects.hash(name, components);
}
@Override
@@ -55,7 +56,47 @@ public class TargetRunnerConfig implements Serializable {
//TODO
return "TargetRunnerConfig{" +
"connectName='" + name + '\'' +
- ", properties=" + properties +
+ ", properties=" + components +
'}';
}
+
+ private boolean isEqualsComponents(List<Map<String, String>> source, List<Map<String, String>> target) {
+ if (source == null || target == null) {
+ if (source != target) {
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ if (source.isEmpty() || target.isEmpty()) {
+ if (source.isEmpty() && target.isEmpty()) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ if (source.size() != target.size()) {
+ return false;
+ }
+ for (int index = 0; index < source.size(); index++) {
+ Map<String, String> sourceComponent = source.get(index);
+ Map<String, String> targetComponent = target.get(index);
+ if (sourceComponent.size() != targetComponent.size()) {
+ return false;
+ }
+ for (Map.Entry<String, String> entry : sourceComponent.entrySet()) {
+ String element = targetComponent.get(entry.getKey());
+ if (element == null && entry.getValue() == null) {
+ return true;
+ } else if (element.equals(entry.getValue())) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
index 27269b8..b963c72 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
@@ -17,51 +17,78 @@
package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
+import com.google.common.collect.Maps;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Type;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
+import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.FileUtils;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
-import org.springframework.beans.factory.annotation.Value;
+import org.apache.rocketmq.eventbridge.exception.EventBridgeException;
@Slf4j
public class TargetRunnerConfigOnFileObserver extends AbstractTargetRunnerConfigObserver {
- @Value("${runtimer.storePathRootDir:}")
- private String storeRootPath;
-
- @Value("${runtimer.store.targetRunner.config:targetRunner-config}")
- private String fileName;
+ private String pathName;
private static ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(
ThreadUtils.newThreadFactory("TargetRunnerConfigOnFileObserver", false));
- public TargetRunnerConfigOnFileObserver(Plugin plugin) {
- this.addListen(storeRootPath, fileName, this);
+ public TargetRunnerConfigOnFileObserver(String pathName) {
+ this.pathName = pathName;
+ super.getTargetRunnerConfig().addAll(getLatestTargetRunnerConfig());
+ this.addListen(pathName, this);
+ }
+
+ public TargetRunnerConfigOnFileObserver() {
+ super.getTargetRunnerConfig().addAll(getLatestTargetRunnerConfig());
+ this.addListen(pathName, this);
}
@Override
public Set<TargetRunnerConfig> getLatestTargetRunnerConfig() {
- return null;
+ String config = null;
+ try {
+ File file = new File(pathName);
+ config = FileUtils.readFileToString(file, "UTF-8");
+ Type workerConfigType = new TypeToken<HashSet<TargetRunnerConfig>>() {
+ }.getType();
+ Set<TargetRunnerConfig> taskConfigList = new Gson().fromJson(config, workerConfigType);
+ return taskConfigList;
+ } catch (IOException e) {
+ throw new EventBridgeException("Load component properties failed.", e);
+ } catch (Throwable e) {
+ log.error("fail to parse config={} from file={}", config, pathName);
+ throw e;
+ }
}
- public void addListen(String pathStr, String fileName,
+ public void addListen(String pathName,
TargetRunnerConfigOnFileObserver pusherConfigOnFileService) {
- log.info("Watching task file changing:{}", pathStr + fileName);
+ log.info("Watching task file changing:{}", pathName);
+ int index = pathName.lastIndexOf("/");
+ String filePath = pathName.substring(0, index);
+ String fileName = pathName.substring(index + 1);
service.scheduleAtFixedRate(() -> {
try {
WatchService watchService = FileSystems.getDefault()
.newWatchService();
- Path path = Paths.get(pathStr);
+ Path path = Paths.get(filePath);
path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE,
StandardWatchEventKinds.ENTRY_MODIFY);
WatchKey watchKey;
@@ -81,12 +108,32 @@ public class TargetRunnerConfigOnFileObserver extends AbstractTargetRunnerConfig
}
public void diff() {
- Set<TargetRunnerConfig> latest = this.getLatestTargetRunnerConfig();
- Set<TargetRunnerConfig> last = super.getTargetRunnerConfig();
- TargetRunnerConfig changed = null;
- super.onAddTargetRunner(changed);
- super.onUpdateTargetRunner(changed);
- super.onDeleteTargetRunner(changed);
+ Map<String, TargetRunnerConfig> lastMap = toMap(super.getTargetRunnerConfig());
+ Map<String, TargetRunnerConfig> latestMap = toMap(this.getLatestTargetRunnerConfig());
+ lastMap.entrySet().forEach(entry -> {
+ TargetRunnerConfig latest = latestMap.get(entry.getKey());
+ if (latest == null) {
+ super.onDeleteTargetRunner(entry.getValue());
+ } else if (!latest.equals(entry.getValue())) {
+ super.onUpdateTargetRunner(entry.getValue());
+ }
+ });
+
+ latestMap.entrySet().forEach(entry -> {
+ TargetRunnerConfig latest = lastMap.get(entry.getKey());
+ if (latest == null) {
+ super.onAddTargetRunner(entry.getValue());
+ }
+ });
+ }
+
+ private Map<String, TargetRunnerConfig> toMap(Set<TargetRunnerConfig> targetRunnerConfigs) {
+ if (targetRunnerConfigs == null || targetRunnerConfigs.isEmpty()) {
+ return Maps.newHashMapWithExpectedSize(0);
+ }
+ Map<String, TargetRunnerConfig> map = Maps.newHashMap();
+ targetRunnerConfigs.forEach(entry -> map.put(entry.getName(), entry));
+ return map;
}
}
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/resources/target-runner.config b/adapter/runtimer/src/main/resources/target-runner.config
deleted file mode 100644
index 04458c6..0000000
--- a/adapter/runtimer/src/main/resources/target-runner.config
+++ /dev/null
@@ -1,7 +0,0 @@
-[
- {
- },
- {
-
- }
-]
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/resources/target-runner.json b/adapter/runtimer/src/main/resources/target-runner.json
new file mode 100644
index 0000000..11b5088
--- /dev/null
+++ b/adapter/runtimer/src/main/resources/target-runner.json
@@ -0,0 +1,23 @@
+[
+ {
+
+ "name":"xxxxx",
+ "components":[ {
+ "class" : "org.apache.rocketmq.connect.FileStream",
+ "path" : "xxxxxxxx",
+ "name" : "demo"
+ },
+ {
+ "class" : "org.apache.rocketmq.connect.transforms.PatternRename",
+ "pattern" : "company",
+ "replacement": "company02"
+ },
+ {
+ "class" : "org.apache.rocketmq.connect.HttpSinkTask",
+ "url" : "http://xxxxx/demo"
+ } ],
+ "runOptions":{
+ "taskSize":1
+ }
+ }
+]
\ No newline at end of file
diff --git a/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/TargetRunnerConfigOnFileObserverTest.java b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/TargetRunnerConfigOnFileObserverTest.java
new file mode 100644
index 0000000..41f1989
--- /dev/null
+++ b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/TargetRunnerConfigOnFileObserverTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.runtimer.service;
+
+import java.time.Duration;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerListener;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigOnFileObserver;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.rocketmq.eventbridge.runtimer.service.TestConstants.DEMO_TARGET_RUNNER_CONFIG_FILE_NAME;
+import static org.awaitility.Awaitility.await;
+
+public class TargetRunnerConfigOnFileObserverTest {
+
+ @Test
+ public void testGetLatestTargetRunnerConfig() {
+ TargetRunnerConfigUtil.resetTargetRunner(getConfigFilePath());
+ TargetRunnerConfigOnFileObserver targetRunnerConfigOnFileObserver = new TargetRunnerConfigOnFileObserver(getConfigFilePath());
+ System.out.println(targetRunnerConfigOnFileObserver.getLatestTargetRunnerConfig());
+ Assert.assertTrue(!targetRunnerConfigOnFileObserver.getLatestTargetRunnerConfig().stream().findFirst().get().getComponents().isEmpty());
+ }
+
+ @Test
+ public void testListen_Add() throws InterruptedException {
+ String path = getConfigFilePath();
+ TestTargetRunnerListener targetRunnerListener = initTargetRunnerConfigOnFileObserver(path);
+ TargetRunnerConfigUtil.addTargetRunner(path);
+ await().atMost(Duration.ofSeconds(60)).until(() -> {
+ return targetRunnerListener.addTargetRunner && !targetRunnerListener.updateTargetRunner && !targetRunnerListener.deleteTargetRunner;
+ });
+ }
+
+ @Test
+ public void testListen_Delete() throws InterruptedException {
+ String path = getConfigFilePath();
+ TestTargetRunnerListener targetRunnerListener = initTargetRunnerConfigOnFileObserver(path);
+ TargetRunnerConfigUtil.deleteTargetRunner(path);
+ await().atMost(Duration.ofSeconds(60)).until(() -> {
+ return !targetRunnerListener.addTargetRunner && !targetRunnerListener.updateTargetRunner && targetRunnerListener.deleteTargetRunner;
+ });
+ }
+
+ @Test
+ public void testListen_Update() throws InterruptedException {
+ String path = getConfigFilePath();
+ TestTargetRunnerListener targetRunnerListener = initTargetRunnerConfigOnFileObserver(path);
+ TargetRunnerConfigUtil.updateTargetRunner(path);
+ await().atMost(Duration.ofSeconds(60)).until(() -> {
+ return !targetRunnerListener.addTargetRunner && targetRunnerListener.updateTargetRunner && !targetRunnerListener.deleteTargetRunner;
+ });
+ }
+
+ private TestTargetRunnerListener initTargetRunnerConfigOnFileObserver(String path) throws InterruptedException {
+ TargetRunnerConfigUtil.resetTargetRunner(path);
+ TargetRunnerConfigOnFileObserver targetRunnerConfigOnFileObserver = new TargetRunnerConfigOnFileObserver(path);
+ TestTargetRunnerListener targetRunnerListener = new TestTargetRunnerListener();
+ targetRunnerConfigOnFileObserver.registerListener(targetRunnerListener);
+ Thread.sleep(3000L);
+ return targetRunnerListener;
+ }
+
+ private String getConfigFilePath() {
+ return this.getClass().getClassLoader().getResource(DEMO_TARGET_RUNNER_CONFIG_FILE_NAME).getPath();
+ }
+
+ class TestTargetRunnerListener implements TargetRunnerListener {
+ boolean addTargetRunner = false;
+ boolean updateTargetRunner = false;
+ boolean deleteTargetRunner = false;
+
+ @Override public void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+ System.out.println("watch target runner");
+ this.addTargetRunner = true;
+ }
+
+ @Override public void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+ System.out.println("watch update runner");
+ this.updateTargetRunner = true;
+ }
+
+ @Override public void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+ System.out.println("watch delete runner");
+ this.deleteTargetRunner = true;
+ }
+ }
+}
\ No newline at end of file
diff --git a/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/TargetRunnerConfigUtil.java b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/TargetRunnerConfigUtil.java
new file mode 100644
index 0000000..a837d52
--- /dev/null
+++ b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/TargetRunnerConfigUtil.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.runtimer.service;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.commons.io.FileUtils;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+import org.apache.rocketmq.eventbridge.exception.EventBridgeException;
+
+import static org.apache.rocketmq.eventbridge.runtimer.service.TestConstants.DEMO_TARGET_RUNNER_NAME;
+
+public class TargetRunnerConfigUtil {
+ /**
+ * add target runner
+ *
+ * @param path
+ */
+
+ public static void addTargetRunner(String path) {
+ Set<TargetRunnerConfig> latest = getLatest(path);
+ latest.add(buildNewTargetRunnerConfig());
+ persistenceToFile(path, latest);
+ }
+
+ /**
+ * update target runner
+ *
+ * @param path
+ */
+ public static void updateTargetRunner(String path) {
+ Set<TargetRunnerConfig> latest = getLatest(path);
+ latest.stream().findFirst().get().getComponents().get(0).put("newKey", "newValue");
+ persistenceToFile(path, latest);
+ }
+
+ /**
+ * update target runner
+ *
+ * @param path
+ */
+ public static void deleteTargetRunner(String path) {
+ persistenceToFile(path, Sets.newHashSet());
+ }
+
+ /**
+ * delete target runner
+ *
+ * @param path
+ */
+ public static void resetTargetRunner(String path) {
+ TargetRunnerConfig targetRunnerConfig = buildNewTargetRunnerConfig(DEMO_TARGET_RUNNER_NAME);
+ try {
+ BufferedWriter out = new BufferedWriter(new FileWriter(path));
+ out.write(new GsonBuilder().setPrettyPrinting().create().toJson(Lists.newArrayList(targetRunnerConfig)));
+ out.close();
+ } catch (IOException e) {
+ throw new EventBridgeException("Load component properties failed.", e);
+ } catch (Throwable e) {
+ throw e;
+ }
+ }
+
+ public static Set<TargetRunnerConfig> getLatest(String path) {
+ String config = null;
+ try {
+ File file = new File(path);
+ config = FileUtils.readFileToString(file, "UTF-8");
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ Type workerConfigType = new TypeToken<HashSet<TargetRunnerConfig>>() {
+ }.getType();
+ return new Gson().fromJson(config, workerConfigType);
+ }
+
+ private static void persistenceToFile(String path, Set<TargetRunnerConfig> taskConfigList) {
+ try {
+ BufferedWriter out = new BufferedWriter(new FileWriter(path));
+ out.write(new GsonBuilder().setPrettyPrinting().create().toJson(taskConfigList));
+ out.close();
+ } catch (IOException e) {
+ throw new EventBridgeException("Load component properties failed.", e);
+ } catch (Throwable e) {
+ throw e;
+ }
+ }
+
+ private static TargetRunnerConfig buildNewTargetRunnerConfig(String name) {
+ TargetRunnerConfig targetRunnerConfig = new TargetRunnerConfig();
+ targetRunnerConfig.setName(name);
+ List<Map<String, String>> components = Lists.newArrayList();
+ Map<String, String> component = Maps.newHashMap();
+ component.put("K1", UUID.randomUUID().toString());
+ components.add(component);
+ targetRunnerConfig.setComponents(components);
+ return targetRunnerConfig;
+ }
+
+ private static TargetRunnerConfig buildNewTargetRunnerConfig() {
+ return buildNewTargetRunnerConfig(UUID.randomUUID().toString());
+ }
+
+}
\ No newline at end of file
diff --git a/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/TestConstants.java b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/TestConstants.java
new file mode 100644
index 0000000..302d675
--- /dev/null
+++ b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/TestConstants.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.runtimer.service;
+
+public class TestConstants {
+ public static final String DEMO_TARGET_RUNNER_NAME = "demo";
+
+ public static final String DEMO_TARGET_RUNNER_CONFIG_FILE_NAME = "target-runner.json";
+}
\ No newline at end of file