You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by sh...@apache.org on 2023/03/10 12:56:32 UTC
[rocketmq-eventbridge] branch runtimer updated: Build runtimer (#56)
This is an automated email from the ASF dual-hosted git repository.
shenlin pushed a commit to branch runtimer
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git
The following commit(s) were added to refs/heads/runtimer by this push:
new e178e31 Build runtimer (#56)
e178e31 is described below
commit e178e31af031714770b27da4f9ee4a2b8fe86546
Author: Artisan <Xi...@outlook.com>
AuthorDate: Fri Mar 10 20:56:27 2023 +0800
Build runtimer (#56)
Build runtimer
---
adapter/pom.xml | 1 +
adapter/rpc/pom.xml | 5 +
.../RocketMQConnectTargetRunnerAPIImpl.java | 70 +++++-
.../RocketMQConnectTargetRunnerContext.java | 5 +
.../connect/dto/CreateSinkConnectorRequest.java | 11 +-
.../connect/dto/CreateSourceConnectorRequest.java | 7 +-
adapter/runtimer/README.md | 31 +++
adapter/{rpc => runtimer}/pom.xml | 55 +++--
.../eventbridge/adapter/runtimer/Runtimer.java | 121 +++++++++
.../adapter/runtimer/boot/EventBusListener.java | 148 +++++++++++
.../adapter/runtimer/boot/EventRuleTransfer.java | 198 +++++++++++++++
.../adapter/runtimer/boot/EventTargetPusher.java | 152 ++++++++++++
.../runtimer/boot/listener/ListenerFactory.java | 198 +++++++++++++++
.../runtimer/boot/pusher/PusherTaskContext.java | 188 ++++++++++++++
.../runtimer/boot/transfer/TransformEngine.java | 152 ++++++++++++
.../runtimer/common/FileAndPropertyUtil.java | 145 +++++++++++
.../runtimer/common/FilePathConfigUtil.java | 39 +++
.../adapter/runtimer/common/LoggerName.java | 28 +++
.../adapter/runtimer/common/QueueState.java} | 13 +-
.../adapter/runtimer/common/RuntimerState.java | 26 ++
.../adapter/runtimer/common/ServiceThread.java | 133 ++++++++++
.../runtimer/common/entity/PusherTargetEntity.java | 50 ++++
.../runtimer/common/entity/TargetKeyValue.java | 162 ++++++++++++
.../adapter/runtimer/common/plugin/Plugin.java | 192 +++++++++++++++
.../runtimer/common/plugin/PluginClassLoader.java | 75 ++++++
.../runtimer/common/plugin/PluginUtils.java | 239 ++++++++++++++++++
.../runtimer/common/plugin/PluginWrapper.java | 76 ++++++
.../common/store/FileBaseKeyValueStore.java | 130 ++++++++++
.../runtimer/common/store/KeyValueStore.java | 106 ++++++++
.../common/store/MemoryBasedKeyValueStore.java | 81 ++++++
.../runtimer/config/RuntimeConfigDefine.java | 112 +++++++++
.../adapter/runtimer/config/RuntimerConfig.java | 37 +++
.../adapter/runtimer/config/TraceConfig.java | 21 ++
.../adapter/runtimer/converter/JsonConverter.java | 77 ++++++
.../adapter/runtimer/converter/ListConverter.java | 64 +++++
.../runtimer/converter/RecordOffsetConverter.java | 58 +++++
.../service/PusherConfigManageService.java | 98 ++++++++
.../service/PusherConfigManageServiceImpl.java | 272 +++++++++++++++++++++
.../runtimer/trace/JaegerTraceStrategy.java | 21 ++
.../adapter/runtimer/trace/TraceConstants.java | 21 ++
.../adapter/runtimer/trace/TraceException.java | 21 ++
.../adapter/runtimer/trace/TraceFactory.java | 31 +++
.../adapter/runtimer/trace/TraceStrategy.java} | 17 +-
.../runtimer/trace/ZipkinTraceStrategy.java | 35 +++
pom.xml | 5 +
start/pom.xml | 4 +
start/src/main/resources/application.properties | 15 +-
47 files changed, 3682 insertions(+), 64 deletions(-)
diff --git a/adapter/pom.xml b/adapter/pom.xml
index 1e7023b..d265b05 100644
--- a/adapter/pom.xml
+++ b/adapter/pom.xml
@@ -25,6 +25,7 @@
<module>persistence</module>
<module>api</module>
<module>rpc</module>
+ <module>runtimer</module>
</modules>
</project>
\ No newline at end of file
diff --git a/adapter/rpc/pom.xml b/adapter/rpc/pom.xml
index 05cb082..7f26f61 100644
--- a/adapter/rpc/pom.xml
+++ b/adapter/rpc/pom.xml
@@ -33,6 +33,10 @@
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-eventbridge-adapter-persistence</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-eventbridge-adapter-runtimer</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-eventbridge-domain</artifactId>
@@ -62,5 +66,6 @@
<artifactId>reactor-core</artifactId>
<version>${reactor.version}</version>
</dependency>
+
</dependencies>
</project>
\ No newline at end of file
diff --git a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/RocketMQConnectTargetRunnerAPIImpl.java b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/RocketMQConnectTargetRunnerAPIImpl.java
index 0a478d1..fda982c 100644
--- a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/RocketMQConnectTargetRunnerAPIImpl.java
+++ b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/RocketMQConnectTargetRunnerAPIImpl.java
@@ -17,14 +17,22 @@
package org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect;
+import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
+
+import java.util.ArrayList;
import java.util.Map;
+import java.util.Objects;
+
import lombok.SneakyThrows;
import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.context.RocketMQConnectSourceRunnerContext;
import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.context.RocketMQConnectTargetRunnerContext;
import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.dto.ActionStatusResponseEnum;
+import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.dto.CreateSinkConnectorRequest;
import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.dto.TransformRequest;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService;
import org.apache.rocketmq.eventbridge.domain.common.enums.EventTargetStatusEnum;
import org.apache.rocketmq.eventbridge.domain.model.Component;
import org.apache.rocketmq.eventbridge.domain.model.run.RunOptions;
@@ -37,10 +45,13 @@ public class RocketMQConnectTargetRunnerAPIImpl extends RocketMQConverter implem
private final RocketMQConnectClient rocketMQConnectClient;
+ private PusherConfigManageService pusherConfigManageService;
+
public RocketMQConnectTargetRunnerAPIImpl(EventDataRepository eventDataRepository,
- RocketMQConnectClient rocketMQConnectClient) {
+ RocketMQConnectClient rocketMQConnectClient, PusherConfigManageService pusherConfigManageService) {
super(eventDataRepository);
this.rocketMQConnectClient = rocketMQConnectClient;
+ this.pusherConfigManageService = pusherConfigManageService;
}
@SneakyThrows
@@ -52,14 +63,21 @@ public class RocketMQConnectTargetRunnerAPIImpl extends RocketMQConverter implem
Map<String, Object> sinkConnectorConfig = this.parseConnectorConfig(target);
TransformRequest filterTransform = this.buildEventBridgeFilterTransform(filterPattern);
TransformRequest eventBridgeTransform = this.buildEventBridgeTransform(targetTransform);
- String connectorName = rocketMQConnectClient.createSinkConnector(name, topicName, sinkConnectorClass,
- sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
- RocketMQConnectTargetRunnerContext context = new RocketMQConnectTargetRunnerContext(connectorName);
+ TargetKeyValue targetKeyValue = initSinkTaskConfig(name, topicName, sinkConnectorClass,
+ sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
+ if(Objects.nonNull(pusherConfigManageService)){
+ pusherConfigManageService.putConnectTargetConfig(name, targetKeyValue);
+ }else {
+ // todo delete
+ rocketMQConnectClient.createSinkConnector(name, topicName, sinkConnectorClass,
+ sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
+ }
+ RocketMQConnectTargetRunnerContext context = new RocketMQConnectTargetRunnerContext(name, JSON.toJSONString(targetKeyValue));
return new Gson().toJson(context);
-
}
@Override
+ @SneakyThrows
public String updateEventTargetRunner(String accountId, String name, Component source, Component target,
String filterPattern, Map<String, TransformParam> targetTransform, RunOptions runOptions, String runContext) {
String topicName = this.parseTopicName(source);
@@ -67,12 +85,20 @@ public class RocketMQConnectTargetRunnerAPIImpl extends RocketMQConverter implem
Map<String, Object> sinkConnectorConfig = this.parseConnectorConfig(target);
TransformRequest filterTransform = this.buildEventBridgeFilterTransform(filterPattern);
TransformRequest eventBridgeTransform = this.buildEventBridgeTransform(targetTransform);
- //stop
- this.delete(runContext);
//create
- String connectorName = rocketMQConnectClient.createSinkConnector(name, topicName, sinkConnectorClass,
- sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
- RocketMQConnectTargetRunnerContext context = new RocketMQConnectTargetRunnerContext(connectorName);
+ TargetKeyValue targetKeyValue = initSinkTaskConfig(name, topicName, sinkConnectorClass,
+ sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
+ if(Objects.nonNull(pusherConfigManageService)){
+ pusherConfigManageService.putConnectTargetConfig(name, targetKeyValue);
+ }else {
+ // todo delete
+ //stop
+ this.delete(runContext);
+
+ rocketMQConnectClient.createSinkConnector(name, topicName, sinkConnectorClass,
+ sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
+ }
+ RocketMQConnectTargetRunnerContext context = new RocketMQConnectTargetRunnerContext(name, JSON.toJSONString(targetKeyValue));
return new Gson().toJson(context);
}
@@ -104,4 +130,28 @@ public class RocketMQConnectTargetRunnerAPIImpl extends RocketMQConverter implem
RocketMQConnectSourceRunnerContext.class);
return rocketMQConnectClient.start(context.getConnectorName());
}
+
+ /**
+ * init sink task config
+ * @param name
+ * @param topicName
+ * @param sinkClass
+ * @param sinkConfig
+ * @param transforms
+ * @return
+ */
+ private TargetKeyValue initSinkTaskConfig(String name, String topicName, String sinkClass, Map<String, Object> sinkConfig, ArrayList<TransformRequest> transforms) {
+ CreateSinkConnectorRequest request = new CreateSinkConnectorRequest();
+ request.setName(name);
+ request.setTopicName(topicName);
+ request.setConnectorClass(sinkClass);
+ request.setConnectorConfig(sinkConfig);
+ request.setTransforms(transforms);
+ Map<String, Object> sinkTaskMap = request.getRequestObject();
+ TargetKeyValue targetKeyValue = new TargetKeyValue();
+ for (String key : sinkTaskMap.keySet()) {
+ targetKeyValue.put(key, sinkTaskMap.get(key).toString());
+ }
+ return targetKeyValue;
+ }
}
diff --git a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/context/RocketMQConnectTargetRunnerContext.java b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/context/RocketMQConnectTargetRunnerContext.java
index eaf5b6c..01def4a 100644
--- a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/context/RocketMQConnectTargetRunnerContext.java
+++ b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/context/RocketMQConnectTargetRunnerContext.java
@@ -25,4 +25,9 @@ public @Data class RocketMQConnectTargetRunnerContext {
private String connectorName;
+ private String targetRunnerConfig;
+
+ public RocketMQConnectTargetRunnerContext(String connectorName) {
+ this.connectorName = connectorName;
+ }
}
diff --git a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/dto/CreateSinkConnectorRequest.java b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/dto/CreateSinkConnectorRequest.java
index 6f5c468..352a09f 100644
--- a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/dto/CreateSinkConnectorRequest.java
+++ b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/dto/CreateSinkConnectorRequest.java
@@ -42,6 +42,10 @@ public class CreateSinkConnectorRequest extends BaseConnectorRequest {
private Map<String, Object> connectorConfig;
+ public CreateSinkConnectorRequest() {
+ super(null);
+ }
+
public CreateSinkConnectorRequest(String endpoint) {
super(endpoint);
}
@@ -53,9 +57,8 @@ public class CreateSinkConnectorRequest extends BaseConnectorRequest {
public Map<String, Object> getRequestObject() {
Map<String, Object> config = Maps.newHashMap();
- config.put("connector.class", connectorClass);
- config.put("connect.topicnames", topicName);
- String sinPrefix = ".";
+ config.put("connector-class", connectorClass);
+ config.put("connect-topicname", topicName);
config.put("transforms", String.join(",", transforms.stream()
.map(TransformRequest::getName)
.collect(Collectors.toList())));
@@ -63,7 +66,7 @@ public class CreateSinkConnectorRequest extends BaseConnectorRequest {
transform.getConfig()
.entrySet()
.forEach(entry -> {
- config.put("transforms" + sinPrefix + transform.getName() + sinPrefix + entry.getKey(), entry.getValue());
+ config.put("transforms" + "-" + transform.getName() + "-" + entry.getKey(), entry.getValue());
});
});
config.putAll(connectorConfig);
diff --git a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/dto/CreateSourceConnectorRequest.java b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/dto/CreateSourceConnectorRequest.java
index 65dd520..40ffd38 100644
--- a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/dto/CreateSourceConnectorRequest.java
+++ b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/dto/CreateSourceConnectorRequest.java
@@ -53,9 +53,8 @@ public class CreateSourceConnectorRequest extends BaseConnectorRequest {
public Map<String, Object> getRequestObject() {
Map<String, Object> config = Maps.newHashMap();
- config.put("connector.class", connectorClass);
- config.put("connect.topicname", topicName);
- String sourcePrefix = ".";
+ config.put("connector-class", connectorClass);
+ config.put("connect-topicname", topicName);
config.put("transforms", String.join(",", transforms.stream()
.map(TransformRequest::getName)
.collect(Collectors.toList())));
@@ -63,7 +62,7 @@ public class CreateSourceConnectorRequest extends BaseConnectorRequest {
transform.getConfig()
.entrySet()
.forEach(entry -> {
- config.put("transforms" + sourcePrefix + transform.getName() + sourcePrefix + entry.getKey(), entry.getValue());
+ config.put("transforms" + "-" + transform.getName() + "-" + entry.getKey(), entry.getValue());
});
});
config.putAll(connectorConfig);
diff --git a/adapter/runtimer/README.md b/adapter/runtimer/README.md
new file mode 100644
index 0000000..bb66b9e
--- /dev/null
+++ b/adapter/runtimer/README.md
@@ -0,0 +1,31 @@
+1, Runtimer主线程
+
+ 1.1,EventBusListener/SPI
+ 订阅事件,转发
+ EventBusOnRocketMQListener
+ Queue
+
+ 1.2 Filter&Transformer
+ EventRuleTransfer
+ CPU
+
+ 1.3 EventTargetPusher
+ IO
+
+TODO:
+OpenMessage as base
+Record
+
+ MVP edition
+
+ Run mvp demo
+
+ next meeting: next weekly, 3.5 20:00
+
+TODO:
+metrics, logger, exception
+detail
+
+
+
+
diff --git a/adapter/rpc/pom.xml b/adapter/runtimer/pom.xml
similarity index 58%
copy from adapter/rpc/pom.xml
copy to adapter/runtimer/pom.xml
index 05cb082..9296b42 100644
--- a/adapter/rpc/pom.xml
+++ b/adapter/runtimer/pom.xml
@@ -9,7 +9,8 @@
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rocketmq-eventbridge-adapter</artifactId>
@@ -18,49 +19,51 @@
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>rocketmq-eventbridge-adapter-rpc</artifactId>
-
- <properties>
- <rocketmq.version>4.9.2</rocketmq.version>
- <httpclient.version>4.5.13</httpclient.version>
- <reactor.version>3.1.7.RELEASE</reactor.version>
- </properties>
+ <artifactId>rocketmq-eventbridge-adapter-runtimer</artifactId>
<version>1.0.0</version>
<dependencies>
- <!-- Project Modules -->
<dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-eventbridge-adapter-persistence</artifactId>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-api</artifactId>
+ <version>0.3.1-alpha</version>
+ <scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-eventbridge-domain</artifactId>
+ <artifactId>rocketmq-client</artifactId>
+ <version>4.9.2</version>
+ <scope>compile</scope>
</dependency>
- <!-- Framework-->
<dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-autoconfigure</artifactId>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-connector</artifactId>
+ <version>0.1.2</version>
+ <scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
- <artifactId>spring-web</artifactId>
+ <artifactId>spring-context</artifactId>
</dependency>
- <!-- Tools-->
<dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-client</artifactId>
- <version>${rocketmq.version}</version>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
- <version>${httpclient.version}</version>
+ <groupId>org.reflections</groupId>
+ <artifactId>reflections</artifactId>
+ <version>0.9.11</version>
+ <scope>compile</scope>
</dependency>
<dependency>
- <groupId>io.projectreactor</groupId>
- <artifactId>reactor-core</artifactId>
- <version>${reactor.version}</version>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
</dependency>
</dependencies>
+
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ </properties>
+
</project>
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
new file mode 100644
index 0000000..6c5095a
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer;
+
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.EventBusListener;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.EventRuleTransfer;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.EventTargetPusher;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.RuntimerState;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfig;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * event bridge runtimer
+ *
+ * @author artisan
+ */
+@Component
+public class Runtimer extends ServiceThread{
+
+ private static final Logger logger = LoggerFactory.getLogger(Runtimer.class);
+
+ private AtomicReference<RuntimerState> runtimerState;
+
+ private Plugin plugin;
+
+ private ListenerFactory listenerFactory;
+
+ private PusherConfigManageService pusherConfigManageService;
+
+ private Map<String, List<TargetKeyValue>> taskConfigs = new HashMap<>();
+
+ private EventBusListener listener;
+
+ private EventRuleTransfer transfer;
+
+ private EventTargetPusher pusher;
+
+ private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor((Runnable r) -> new Thread(r, "RuntimerScheduledThread"));
+
+ public Runtimer(Plugin plugin, ListenerFactory listenerFactory, PusherConfigManageService configManageService) {
+ this.plugin = plugin;
+ this.listenerFactory = listenerFactory;
+ this.pusherConfigManageService = configManageService;
+ }
+
+ @PostConstruct
+ public void initAndStart() {
+ logger.info("init runtimer task config");
+ this.taskConfigs = pusherConfigManageService.getTaskConfigs();
+ listener = new EventBusListener(listenerFactory, pusherConfigManageService);
+ listener.initOrUpdateListenConsumer(taskConfigs);
+ transfer = new EventRuleTransfer(plugin, listenerFactory, pusherConfigManageService);
+ transfer.initOrUpdateTaskTransform(taskConfigs);
+ pusher = new EventTargetPusher(plugin, listenerFactory, pusherConfigManageService);
+ pusher.initOrUpdatePusherTask(taskConfigs);
+ startRuntimer();
+ }
+
+ public void startRuntimer() {
+ runtimerState = new AtomicReference<>(RuntimerState.START);
+ this.start();
+ }
+
+ @Override
+ public String getServiceName() {
+ return Runtimer.class.getSimpleName();
+ }
+
+ @Override
+ public void run() {
+
+ listener.start();
+
+ transfer.start();
+
+ pusher.start();
+
+ scheduledExecutorService.scheduleAtFixedRate(() -> {
+ try {
+ this.pusherConfigManageService.persist();
+ } catch (Exception e) {
+ logger.error("schedule persist config error.", e);
+ }
+ }, 500, 500, TimeUnit.MILLISECONDS);
+
+ }
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
new file mode 100644
index 0000000..dfeb3e5
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.boot;
+
+import com.alibaba.fastjson.JSON;
+import com.google.common.collect.Lists;
+import org.apache.commons.collections.MapUtils;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.QueueState;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.CollectionUtils;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * listen the event and offer to queue
+ * @author artisan
+ */
+public class EventBusListener extends ServiceThread {
+
+ private Logger logger = LoggerFactory.getLogger(EventBusListener.class);
+
+ private final ConcurrentHashMap<MessageQueue, Long> messageQueuesOffsetMap;
+
+ private final ConcurrentHashMap<MessageQueue, QueueState> messageQueuesStateMap;
+
+ private List<String> topics = new CopyOnWriteArrayList<>();
+
+ private List<DefaultLitePullConsumer> listenConsumer = new CopyOnWriteArrayList<>();
+
+ private ListenerFactory listenerFactory;
+
+ private PusherConfigManageService pusherConfigManageService;
+
+ private ExecutorService executorService = new ThreadPoolExecutor(20,60, 1000,TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(100));
+
+ private BlockingQueue<MessageExt> eventMessage = new LinkedBlockingQueue(50000);
+
+ public EventBusListener(ListenerFactory listenerFactory, PusherConfigManageService pusherConfigManageService){
+ this.messageQueuesOffsetMap = new ConcurrentHashMap<>(256);
+ this.messageQueuesStateMap = new ConcurrentHashMap<>(256);
+ this.listenerFactory = listenerFactory;
+ this.pusherConfigManageService = pusherConfigManageService;
+ this.pusherConfigManageService.registerListener(new ConsumerUpdateListenerImpl());
+ }
+
+ /**
+ * init listen consumer
+ * @param taskConfig
+ */
+ public void initOrUpdateListenConsumer(Map<String, List<TargetKeyValue>> taskConfig){
+ if(MapUtils.isEmpty(taskConfig)){
+ logger.warn("initListenConsumer by taskConfig param is empty");
+ return;
+ }
+ List<TargetKeyValue> targetKeyValues = initTaskKeyInfo(taskConfig);
+ this.topics.addAll(listenerFactory.parseTopicListByList(targetKeyValues));
+ for (String topic : topics){
+ DefaultLitePullConsumer pullConsumer = listenerFactory.initDefaultMQPullConsumer(topic);
+ listenConsumer.add(pullConsumer);
+ }
+ logger.info("init or update consumer succeed , consumer is - {}", JSON.toJSONString(listenConsumer));
+ }
+
+ /**
+ * init all task config info
+ * @param taskConfig
+ * @return
+ */
+ private List<TargetKeyValue> initTaskKeyInfo(Map<String, List<TargetKeyValue>> taskConfig) {
+ Set<TargetKeyValue> targetKeyValues = new HashSet<>();
+ for(String connectName : taskConfig.keySet()){
+ targetKeyValues.addAll(taskConfig.get(connectName));
+ }
+ return Lists.newArrayList(targetKeyValues);
+ }
+
+ @Override
+ public void run() {
+ while (!stopped){
+ if(CollectionUtils.isEmpty(listenConsumer)){
+ logger.info("current listen consumer is empty");
+ this.waitForRunning(1000);
+ continue;
+ }
+ for(DefaultLitePullConsumer pullConsumer : listenConsumer) {
+ executorService.submit(() -> {
+ try {
+ List<MessageExt> messageExts = pullConsumer.poll(3000);
+ if (CollectionUtils.isEmpty(messageExts)) {
+ logger.info("consumer poll message empty , consumer - {}", JSON.toJSONString(pullConsumer));
+ return;
+ }
+ for (MessageExt messageExt : messageExts) {
+ listenerFactory.offerListenEvent(messageExt);
+ logger.debug("consumer - {} - offer listen event - {}", JSON.toJSONString(pullConsumer), JSON.toJSON(messageExt));
+ }
+ } finally {
+ pullConsumer.commitSync();
+ }
+ });
+ }
+ }
+ }
+
+ @Override
+ public String getServiceName() {
+ return this.getClass().getSimpleName();
+ }
+
+ /**
+ * consumer update listener
+ */
+ class ConsumerUpdateListenerImpl implements PusherConfigManageService.TargetConfigUpdateListener {
+
+ @Override
+ public void onConfigUpdate(PusherTargetEntity targetEntity) {
+ logger.info("consumer update by new target config changed, target info -{}", JSON.toJSONString(targetEntity));
+ Map<String, List<TargetKeyValue>> lastTargetMap = new HashMap<>();
+ lastTargetMap.put(targetEntity.getConnectName(), targetEntity.getTargetKeyValues());
+ initOrUpdateListenConsumer(lastTargetMap);
+ }
+ }
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
new file mode 100644
index 0000000..3be7b8a
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.boot;
+
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.transfer.TransformEngine;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimeConfigDefine;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * receive event and transfer the rule to pusher
+ */
+public class EventRuleTransfer extends ServiceThread {
+
+ private static final Logger logger = LoggerFactory.getLogger(EventRuleTransfer.class);
+
+ private ListenerFactory listenerFactory;
+
+ private PusherConfigManageService pusherConfigManageService;
+
+ private Plugin plugin;
+
+ Map<TargetKeyValue/*taskConfig*/, TransformEngine<ConnectRecord>/*taskTransform*/> taskTransformMap = new ConcurrentHashMap<>(20);
+
+ private ExecutorService executorService = new ThreadPoolExecutor(20,60, 1000,TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(100));
+
+ private ExecutorService singleExecutor = Executors.newSingleThreadScheduledExecutor();
+
+ public EventRuleTransfer(Plugin plugin, ListenerFactory listenerFactory, PusherConfigManageService pusherConfigManageService){
+ this.plugin = plugin;
+ this.listenerFactory = listenerFactory;
+ this.pusherConfigManageService = pusherConfigManageService;
+ this.pusherConfigManageService.registerListener(new TransformUpdateListenerImpl());
+ }
+
+ public void initOrUpdateTaskTransform(Map<String, List<TargetKeyValue>> taskConfig){
+ this.taskTransformMap.putAll(initSinkTaskTransformInfo(taskConfig));
+ }
+
+ private static final Set<String> MQ_SYS_KEYS = new HashSet<String>() {
+ {
+ add("MIN_OFFSET");
+ add("TRACE_ON");
+ add("MAX_OFFSET");
+ add("MSG_REGION");
+ add("UNIQ_KEY");
+ add("WAIT");
+ add("TAGS");
+ }
+ };
+
+ @Override
+ public String getServiceName() {
+ return this.getClass().getSimpleName();
+ }
+
+ @Override
+ public void run() {
+ while (!stopped){
+ MessageExt messageExt = listenerFactory.takeListenerEvent();
+ if(Objects.isNull(messageExt)){
+ logger.info("listen message is empty, continue by curTime - {}", System.currentTimeMillis());
+ this.waitForRunning(1000);
+ continue;
+ }
+ executorService.submit(() -> {
+ ConnectRecord connectRecord = convertToSinkDataEntry(messageExt);
+ // extension add sub
+ // rule - target
+ for (TargetKeyValue targetKeyValue : taskTransformMap.keySet()){
+ // add threadPool for cup task
+ // attention coreSize
+ TransformEngine<ConnectRecord> transformEngine = taskTransformMap.get(targetKeyValue);
+ ConnectRecord transformRecord = transformEngine.doTransforms(connectRecord);
+ if(Objects.isNull(transformRecord)){
+ continue;
+ }
+ // a bean for maintain
+ Map<TargetKeyValue,ConnectRecord> targetMap = new HashMap<>();
+ targetMap.put(targetKeyValue, transformRecord);
+ listenerFactory.offerTargetTaskQueue(targetMap);
+
+ logger.debug("offer target task queue succeed, targetMap - {}", JSON.toJSONString(targetMap));
+ // metrics
+ // logger
+ // key->connectKeyValue to simple name
+ // connectRecord add system properties for taskClass info
+ }
+ });
+ }
+ }
+
+ /**
+ * Init sink task transform map
+ * key: task config
+ * value: transformEngine
+ * @param taskConfig
+ * @return
+ */
+ private Map<TargetKeyValue, TransformEngine<ConnectRecord>> initSinkTaskTransformInfo(Map<String, List<TargetKeyValue>> taskConfig) {
+ Map<TargetKeyValue, TransformEngine<ConnectRecord>> curTaskTransformMap = new HashMap<>();
+ Set<TargetKeyValue> allTaskKeySet = new HashSet<>();
+ for(String connectName : taskConfig.keySet()){
+ List<TargetKeyValue> taskKeyList = taskConfig.get(connectName);
+ allTaskKeySet.addAll(new HashSet<>(taskKeyList));
+ }
+ for(TargetKeyValue keyValue : allTaskKeySet){
+ TransformEngine<ConnectRecord> transformChain = new TransformEngine<>(keyValue, plugin);
+ curTaskTransformMap.put(keyValue, transformChain);
+ }
+ logger.info("init sink task transform info succeed, transform map - {}", JSON.toJSONString(curTaskTransformMap));
+ return curTaskTransformMap;
+ }
+
+ /**
+ * MessageExt convert to connect record
+ * @param message
+ * @return
+ */
+ private ConnectRecord convertToSinkDataEntry(MessageExt message) {
+ Map<String, String> properties = message.getProperties();
+ Schema schema;
+ Long timestamp;
+ ConnectRecord sinkDataEntry;
+ String connectTimestamp = properties.get(RuntimeConfigDefine.CONNECT_TIMESTAMP);
+ timestamp = StringUtils.isNotEmpty(connectTimestamp) ? Long.valueOf(connectTimestamp) : null;
+ String connectSchema = properties.get(RuntimeConfigDefine.CONNECT_SCHEMA);
+ schema = StringUtils.isNotEmpty(connectSchema) ? JSON.parseObject(connectSchema, Schema.class) : null;
+ byte[] body = message.getBody();
+ RecordPartition recordPartition = listenerFactory.convertToRecordPartition(message.getTopic(), message.getBrokerName(), message.getQueueId());
+ RecordOffset recordOffset = listenerFactory.convertToRecordOffset(message.getQueueOffset());
+ String bodyStr = new String(body, StandardCharsets.UTF_8);
+ sinkDataEntry = new ConnectRecord(recordPartition, recordOffset, timestamp, schema, bodyStr);
+ KeyValue keyValue = new DefaultKeyValue();
+ if (MapUtils.isNotEmpty(properties)) {
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ if (MQ_SYS_KEYS.contains(entry.getKey())) {
+ keyValue.put("MQ-SYS-" + entry.getKey(), entry.getValue());
+ } else if (entry.getKey().startsWith("connect-ext-")) {
+ keyValue.put(entry.getKey().replaceAll("connect-ext-", ""), entry.getValue());
+ } else {
+ keyValue.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+ sinkDataEntry.addExtension(keyValue);
+ return sinkDataEntry;
+ }
+
+ /**
+ * transform update listener
+ */
+ class TransformUpdateListenerImpl implements PusherConfigManageService.TargetConfigUpdateListener {
+
+ @Override
+ public void onConfigUpdate(PusherTargetEntity targetEntity) {
+ logger.info("transform update by new target config changed, target info -{}", JSON.toJSONString(targetEntity));
+ Map<String, List<TargetKeyValue>> lastTargetMap = new HashMap<>();
+ lastTargetMap.put(targetEntity.getConnectName(), targetEntity.getTargetKeyValues());
+ initOrUpdateTaskTransform(lastTargetMap);
+ }
+ }
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
new file mode 100644
index 0000000..105c4b6
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.boot;
+
+import com.alibaba.fastjson.JSON;
+import com.google.common.collect.Lists;
+import io.netty.util.internal.ConcurrentSet;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import lombok.SneakyThrows;
+import org.apache.commons.collections.MapUtils;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.pusher.PusherTaskContext;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.PluginClassLoader;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimeConfigDefine;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * event target push to sink task
+ * @author artisan
+ */
+public class EventTargetPusher extends ServiceThread {
+
+ private static final Logger logger = LoggerFactory.getLogger(EventTargetPusher.class);
+
+ private Set<Runnable> runningTasks = new ConcurrentSet<>();
+
+ private Set<Runnable> errorTasks = new ConcurrentSet<>();
+
+ private Set<Runnable> stoppedTasks = new ConcurrentSet<>();
+
+ private Plugin plugin;
+
+ private ListenerFactory listenerFactory;
+
+ private PusherConfigManageService pusherConfigManageService;
+
+ private List<SinkTask> pusherTasks = new CopyOnWriteArrayList<>();
+
+ public EventTargetPusher(Plugin plugin, ListenerFactory listenerFactory, PusherConfigManageService pusherConfigManageService){
+ this.plugin = plugin;
+ this.listenerFactory = listenerFactory;
+ this.pusherConfigManageService = pusherConfigManageService;
+ this.pusherConfigManageService.registerListener(new TargetUpdateListenerImpl());
+ }
+
+ /**
+ * init running tasks
+ * @param taskConfig
+ */
+ public void initOrUpdatePusherTask(Map<String, List<TargetKeyValue>> taskConfig){
+ Set<TargetKeyValue> taskProperty = new HashSet<>();
+ for(String connectName : taskConfig.keySet()){
+ List<TargetKeyValue> targetKeyValues = taskConfig.get(connectName);
+ taskProperty.addAll(new HashSet<>(targetKeyValues));
+ }
+ for(TargetKeyValue targetKeyValue : taskProperty){
+ try{
+ String taskClass = targetKeyValue.getString(RuntimeConfigDefine.TASK_CLASS);
+ ClassLoader loader = plugin.getPluginClassLoader(taskClass);
+ Class taskClazz;
+ boolean isolationFlag = false;
+ if (loader instanceof PluginClassLoader) {
+ taskClazz = ((PluginClassLoader) loader).loadClass(taskClass, false);
+ isolationFlag = true;
+ } else {
+ taskClazz = Class.forName(taskClass);
+ }
+ SinkTask sinkTask = (SinkTask) taskClazz.getDeclaredConstructor().newInstance();
+ sinkTask.init(targetKeyValue);
+ PusherTaskContext sinkTaskContext = new PusherTaskContext(targetKeyValue);
+ sinkTask.start(sinkTaskContext);
+ pusherTasks.add(sinkTask);
+ if (isolationFlag) {
+ Plugin.compareAndSwapLoaders(loader);
+ }
+ logger.info("init target task succeed, target key - {}", JSON.toJSONString(targetKeyValue));
+ }catch (Exception exception){
+ exception.printStackTrace();
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+ while (!stopped){
+ Map<TargetKeyValue, ConnectRecord> taskPusher = listenerFactory.takeTargetMap();
+ if(MapUtils.isEmpty(taskPusher)){
+ logger.info("current target pusher is empty");
+ this.waitForRunning(1000);
+ continue;
+ }
+ logger.info("start push content by pusher - {}", JSON.toJSONString(taskPusher));
+
+ TargetKeyValue targetKeyValue = taskPusher.keySet().iterator().next();
+ // task-id for unique-key at ConnectKeyValue
+ // ConnectKeyValue -> new class for name
+ // also add in ConnectRecord class system property
+ String taskPushName = targetKeyValue.getString(RuntimeConfigDefine.TASK_CLASS);
+ // add thread pool
+ for(SinkTask sinkTask : pusherTasks){
+ if(sinkTask.getClass().getName().equals(taskPushName)){
+ sinkTask.put(Lists.newArrayList(taskPusher.get(targetKeyValue)));
+ }
+ }
+ }
+ }
+
+ @Override
+ public String getServiceName() {
+ return EventTargetPusher.class.getSimpleName();
+ }
+
+ /**
+ * target update listener
+ */
+ class TargetUpdateListenerImpl implements PusherConfigManageService.TargetConfigUpdateListener {
+
+ @Override
+ public void onConfigUpdate(PusherTargetEntity targetEntity) {
+ logger.info("transform update by new target config changed, target info -{}", JSON.toJSONString(targetEntity));
+ Map<String, List<TargetKeyValue>> lastTargetMap = new HashMap<>();
+ lastTargetMap.put(targetEntity.getConnectName(), targetEntity.getTargetKeyValues());
+ initOrUpdatePusherTask(lastTargetMap);
+ }
+ }
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java
new file mode 100644
index 0000000..33804b4
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.LoggerName;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimeConfigDefine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+@Component
+public class ListenerFactory {
+
+ private final static Logger logger = LoggerFactory.getLogger(LoggerName.EventBus_Listener);
+
+ private static final String SEMICOLON = ";";
+
+ private static final String SYS_DEFAULT_GROUP = "default-%s-group";
+
+ public static final String QUEUE_OFFSET = "queueOffset";
+
+ private BlockingQueue<PusherTargetEntity> pusherTargetQueue = new LinkedBlockingQueue<>(1000);
+
+ private BlockingQueue<MessageExt> eventMessage = new LinkedBlockingQueue(50000);
+
+ private BlockingQueue<Map<TargetKeyValue, ConnectRecord>> targetQueue = new LinkedBlockingQueue<>(50000);
+
+ @Value("${rocketmq.namesrvAddr:}")
+ private String namesrvAddr;
+
+ public DefaultLitePullConsumer initDefaultMQPullConsumer(String topic) {
+ DefaultLitePullConsumer consumer = new DefaultLitePullConsumer();
+ try {
+ consumer.setConsumerGroup(String.format(SYS_DEFAULT_GROUP, topic));
+ consumer.setNamesrvAddr(namesrvAddr);
+ consumer.subscribe(topic, "*");
+ consumer.start();
+ }catch (Exception exception){
+ logger.error("init default pull consumer exception, topic -" + topic + "-stackTrace-", exception);
+ }
+ return consumer;
+ }
+
+ public PusherTargetEntity takeTaskConfig(){
+ if(pusherTargetQueue.isEmpty()){
+ return null;
+ }
+ try {
+ return pusherTargetQueue.take();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+ /**
+ * Offer listener event
+ * @param messageExt
+ * @return
+ */
+ public boolean offerListenEvent(MessageExt messageExt){
+ return eventMessage.offer(messageExt);
+ }
+
+ public MessageExt takeListenerEvent() {
+ if(eventMessage.isEmpty()){
+ return null;
+ }
+ try {
+ return eventMessage.take();
+ }catch (Exception exception){
+ exception.printStackTrace();
+ }
+ return null;
+ }
+
+ public String createInstance(String servers) {
+ String[] serversArray = servers.split(";");
+ List<String> serversList = new ArrayList<String>();
+ for (String server : serversArray) {
+ if (!serversList.contains(server)) {
+ serversList.add(server);
+ }
+ }
+ Collections.sort(serversList);
+ return String.valueOf(serversList.toString().hashCode());
+ }
+
+ /**
+ * parse topic list by task config
+ * @param taskConfig
+ * @return
+ */
+ public List<String> parseTopicList(TargetKeyValue taskConfig) {
+ String messageQueueStr = taskConfig.getString(RuntimeConfigDefine.CONNECT_TOPICNAME);
+ if (StringUtils.isBlank(messageQueueStr)) {
+ return null;
+ }
+ List<String> topicList = Splitter.on(SEMICOLON).omitEmptyStrings().trimResults().splitToList(messageQueueStr);
+ return Lists.newArrayList(new HashSet<>(topicList));
+ }
+
+ /**
+ * parse topic list by task config list
+ * @param taskConfigs
+ * @return
+ */
+ public List<String> parseTopicListByList(List<TargetKeyValue> taskConfigs) {
+ Set<String> allTopicList = Sets.newHashSet();
+ for(TargetKeyValue taskConfig : taskConfigs){
+ String messageQueueStr = taskConfig.getString(RuntimeConfigDefine.CONNECT_TOPICNAME);
+ if (StringUtils.isBlank(messageQueueStr)) {
+ continue;
+ }
+ List<String> topicList = Splitter.on(SEMICOLON).omitEmptyStrings().trimResults().splitToList(messageQueueStr);
+ allTopicList.addAll(topicList);
+ }
+ return Lists.newArrayList(allTopicList);
+ }
+
+ /**
+ * parse msg queue by queue json
+ * @param messageQueueStr
+ * @return
+ */
+ public MessageQueue parseMessageQueueList(String messageQueueStr) {
+ List<String> messageQueueStrList = Splitter.on(SEMICOLON).omitEmptyStrings().trimResults().splitToList(messageQueueStr);
+ if (CollectionUtils.isEmpty(messageQueueStrList) || messageQueueStrList.size() != 3) {
+ return null;
+ }
+ return new MessageQueue(messageQueueStrList.get(0), messageQueueStrList.get(1), Integer.valueOf(messageQueueStrList.get(2)));
+ }
+
+ public RecordPartition convertToRecordPartition(String topic, String brokerName, int queueId) {
+ Map<String, String> map = new HashMap<>();
+ map.put("topic", topic);
+ map.put("brokerName", brokerName);
+ map.put("queueId", queueId + "");
+ RecordPartition recordPartition = new RecordPartition(map);
+ return recordPartition;
+ }
+
+ public RecordOffset convertToRecordOffset(Long offset) {
+ Map<String, String> offsetMap = new HashMap<>();
+ offsetMap.put(QUEUE_OFFSET, offset + "");
+ RecordOffset recordOffset = new RecordOffset(offsetMap);
+ return recordOffset;
+ }
+
+ public boolean offerTargetTaskQueue(Map<TargetKeyValue, ConnectRecord> targetMap){
+ return targetQueue.offer(targetMap);
+ }
+
+ public Map<TargetKeyValue, ConnectRecord> takeTargetMap(){
+ if(targetQueue.isEmpty()){
+ return null;
+ }
+ try{
+ return targetQueue.take();
+ }catch (Exception exception){
+ exception.printStackTrace();
+ }
+ return null;
+ }
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/pusher/PusherTaskContext.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/pusher/PusherTaskContext.java
new file mode 100644
index 0000000..dc17336
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/pusher/PusherTaskContext.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.pusher;
+
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.LoggerName;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.QueueState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class PusherTaskContext implements SinkTaskContext {
+
+ /**
+ * The configs of current sink task.
+ */
+ private final TargetKeyValue taskConfig;
+
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.EventBridge_RUNTIMER);
+
+ private final Map<MessageQueue, Long> messageQueuesOffsetMap = new ConcurrentHashMap<>(64);
+
+ private final Map<MessageQueue, QueueState> messageQueuesStateMap = new ConcurrentHashMap<>(64);
+
+ public static final String BROKER_NAME = "brokerName";
+ public static final String QUEUE_ID = "queueId";
+ public static final String TOPIC = "topic";
+ public static final String QUEUE_OFFSET = "queueOffset";
+
+ public PusherTaskContext(TargetKeyValue taskConfig) {
+ this.taskConfig = taskConfig;
+ }
+
+ @Override
+ public void resetOffset(RecordPartition recordPartition, RecordOffset recordOffset) {
+ if (null == recordPartition || null == recordPartition.getPartition() || null == recordOffset || null == recordOffset.getOffset()) {
+ log.warn("recordPartition {} info is null or recordOffset {} info is null", recordPartition, recordOffset);
+ return;
+ }
+ String brokerName = (String) recordPartition.getPartition().get(BROKER_NAME);
+ String topic = (String) recordPartition.getPartition().get(TOPIC);
+ Integer queueId = Integer.valueOf((String) recordPartition.getPartition().get(QUEUE_ID));
+ if (StringUtils.isEmpty(brokerName) || StringUtils.isEmpty(topic) || null == queueId) {
+ log.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic);
+ return;
+ }
+ MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueId);
+ Long offset = Long.valueOf((String) recordOffset.getOffset().get(QUEUE_OFFSET));
+ if (null == offset) {
+ log.warn("resetOffset, offset is null");
+ return;
+ }
+ messageQueuesOffsetMap.put(messageQueue, offset);
+ }
+
+ @Override
+ public void resetOffset(Map<RecordPartition, RecordOffset> offsets) {
+ if (MapUtils.isEmpty(offsets)) {
+ log.warn("resetOffset, offsets {} is null", offsets);
+ return;
+ }
+ for (Map.Entry<RecordPartition, RecordOffset> entry : offsets.entrySet()) {
+ if (null == entry || null == entry.getKey() || null == entry.getKey().getPartition() || null == entry.getValue() || null == entry.getValue().getOffset()) {
+ log.warn("recordPartition {} info is null or recordOffset {} info is null, entry {}", entry);
+ continue;
+ }
+ RecordPartition recordPartition = entry.getKey();
+ String brokerName = (String) recordPartition.getPartition().get(BROKER_NAME);
+ String topic = (String) recordPartition.getPartition().get(TOPIC);
+ Integer queueId = Integer.valueOf((String) recordPartition.getPartition().get(QUEUE_ID));
+ if (StringUtils.isEmpty(brokerName) || StringUtils.isEmpty(topic) || null == queueId) {
+ log.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic);
+ continue;
+ }
+ MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueId);
+ RecordOffset recordOffset = entry.getValue();
+ Long offset = Long.valueOf((String) recordOffset.getOffset().get(QUEUE_OFFSET));
+ if (null == offset) {
+ log.warn("resetOffset, offset is null");
+ continue;
+ }
+ messageQueuesOffsetMap.put(messageQueue, offset);
+ }
+ }
+
+ @Override
+ public void pause(List<RecordPartition> recordPartitions) {
+ if (recordPartitions == null || recordPartitions.size() == 0) {
+ log.warn("recordPartitions is null or recordPartitions.size() is zero. recordPartitions {}", JSON.toJSONString(recordPartitions));
+ return;
+ }
+ for (RecordPartition recordPartition : recordPartitions) {
+ if (null == recordPartition || null == recordPartition.getPartition()) {
+ log.warn("recordPartition {} info is null", recordPartition);
+ continue;
+ }
+ String brokerName = (String) recordPartition.getPartition().get(BROKER_NAME);
+ String topic = (String) recordPartition.getPartition().get(TOPIC);
+ Integer queueId = Integer.valueOf((String) recordPartition.getPartition().get(QUEUE_ID));
+ if (StringUtils.isEmpty(brokerName) || StringUtils.isEmpty(topic) || null == queueId) {
+ log.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic);
+ continue;
+ }
+ MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueId);
+ if (!messageQueuesOffsetMap.containsKey(messageQueue)) {
+ log.warn("sink task current messageQueuesOffsetMap {} not contain messageQueue {}", messageQueuesOffsetMap, messageQueue);
+ continue;
+ }
+ messageQueuesStateMap.put(messageQueue, QueueState.PAUSE);
+ }
+ }
+
+ @Override
+ public void resume(List<RecordPartition> recordPartitions) {
+ if (recordPartitions == null || recordPartitions.size() == 0) {
+ log.warn("recordPartitions is null or recordPartitions.size() is zero. recordPartitions {}", JSON.toJSONString(recordPartitions));
+ return;
+ }
+ for (RecordPartition recordPartition : recordPartitions) {
+ if (null == recordPartition || null == recordPartition.getPartition()) {
+ log.warn("recordPartition {} info is null", recordPartition);
+ continue;
+ }
+ String brokerName = (String) recordPartition.getPartition().get(BROKER_NAME);
+ String topic = (String) recordPartition.getPartition().get(TOPIC);
+ Integer queueId = Integer.valueOf((String) recordPartition.getPartition().get(QUEUE_ID));
+ if (StringUtils.isEmpty(brokerName) || StringUtils.isEmpty(topic) || null == queueId) {
+ log.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic);
+ continue;
+ }
+ MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueId);
+ if (!messageQueuesOffsetMap.containsKey(messageQueue)) {
+ log.warn("sink task current messageQueuesOffsetMap {} not contain messageQueue {}", messageQueuesOffsetMap, messageQueue);
+ continue;
+ }
+ messageQueuesStateMap.remove(messageQueue);
+ }
+ }
+
+ @Override public Set<RecordPartition> assignment() {
+ return null;
+ }
+
+ @Override public String getConnectorName() {
+ return taskConfig.getString("connectorName");
+ }
+
+ @Override public String getTaskName() {
+ return taskConfig.getString("taskId");
+ }
+
+ public Map<MessageQueue, Long> queuesOffsets() {
+ return this.messageQueuesOffsetMap;
+ }
+
+ public void cleanQueuesOffsets() {
+ this.messageQueuesOffsetMap.clear();
+ }
+
+
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java
new file mode 100644
index 0000000..8c75702
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.transfer;
+
+import com.alibaba.fastjson.JSON;
+import com.google.common.base.Splitter;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.Transform;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.LoggerName;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.PluginClassLoader;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimeConfigDefine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+public class TransformEngine<R extends ConnectRecord> implements AutoCloseable {
+
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.EventBridge_RUNTIMER);
+
+ private final List<Transform> transformList;
+
+ private final KeyValue config;
+
+ private final Plugin plugin;
+
+ private static final String COMMA = ",";
+
+ private static final String PREFIX = RuntimeConfigDefine.TRANSFORMS + "-";
+
+ public TransformEngine(KeyValue config, Plugin plugin) {
+ this.config = config;
+ this.plugin = plugin;
+ transformList = new ArrayList<>(8);
+ init();
+ }
+
+ private void init() {
+ String transformsStr = config.getString(RuntimeConfigDefine.TRANSFORMS);
+ if (StringUtils.isBlank(transformsStr)) {
+ log.warn("no transforms config, {}", JSON.toJSONString(config));
+ return;
+ }
+ List<String> transformList = Splitter.on(COMMA).omitEmptyStrings().trimResults().splitToList(transformsStr);
+ if (CollectionUtils.isEmpty(transformList)) {
+ log.warn("transforms config is null, {}", JSON.toJSONString(config));
+ return;
+ }
+ transformList.stream().forEach(transformStr -> {
+ String transformClassKey = PREFIX + transformStr + "-class";
+ String transformClass = config.getString(transformClassKey);
+ try {
+ Transform transform = getTransform(transformClass);
+ KeyValue transformConfig = new DefaultKeyValue();
+ Set<String> configKeys = config.keySet();
+ for (String key : configKeys) {
+ if (key.startsWith(PREFIX + transformStr) && !key.equals(transformClassKey)) {
+ String originKey = key.replace(PREFIX + transformStr + "-", "");
+ transformConfig.put(originKey, config.getString(key));
+ }
+ }
+ transform.validate(transformConfig);
+ transform.init(transformConfig);
+ this.transformList.add(transform);
+ } catch (Exception e) {
+ log.error("transform new instance error", e);
+ }
+ });
+ }
+
+ public R doTransforms(R connectRecord) {
+ if (transformList.size() == 0) {
+ return connectRecord;
+ }
+ for (final Transform<R> transform : transformList) {
+ final R currentRecord = connectRecord;
+ connectRecord = transform.doTransform(currentRecord);
+ if (connectRecord == null) {
+ break;
+ }
+ }
+ return connectRecord;
+ }
+
+ private Transform getTransform(String transformClass) throws Exception {
+ ClassLoader loader = plugin.getPluginClassLoader(transformClass);
+ final ClassLoader currentThreadLoader = plugin.currentThreadLoader();
+ Class transformClazz;
+ boolean isolationFlag = false;
+ if (loader instanceof PluginClassLoader) {
+ transformClazz = ((PluginClassLoader) loader).loadClass(transformClass, false);
+ isolationFlag = true;
+ } else {
+ transformClazz = Class.forName(transformClass);
+ }
+ final Transform transform = (Transform) transformClazz.getDeclaredConstructor().newInstance();
+ if (isolationFlag) {
+ Plugin.compareAndSwapLoaders(loader);
+ }
+
+ Plugin.compareAndSwapLoaders(currentThreadLoader);
+ return transform;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ TransformEngine<?> that = (TransformEngine<?>) o;
+ return transformList.equals(that.transformList) && config.equals(that.config);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(transformList, config);
+ }
+
+ /**
+ * close transforms
+ *
+ * @throws Exception if this resource cannot be closed
+ */
+ @Override
+ public void close() throws Exception {
+ for (Transform transform : transformList) {
+ transform.stop();
+ }
+ }
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/FileAndPropertyUtil.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/FileAndPropertyUtil.java
new file mode 100644
index 0000000..6b121d7
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/FileAndPropertyUtil.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.common;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.Properties;
+
+/**
+ * Utils for file and property.
+ */
+public class FileAndPropertyUtil {
+
+ /**
+ * Store the string to a file.
+ *
+ * @param str
+ * @param fileName
+ * @throws IOException
+ */
+ public static void string2File(final String str, final String fileName) throws IOException {
+
+ String tmpFile = fileName + ".tmp";
+ string2FileNotSafe(str, tmpFile);
+
+ String bakFile = fileName + ".bak";
+ String prevContent = file2String(fileName);
+ if (prevContent != null) {
+ string2FileNotSafe(prevContent, bakFile);
+ }
+
+ File file = new File(fileName);
+ file.delete();
+
+ file = new File(tmpFile);
+ file.renameTo(new File(fileName));
+ }
+
+ public static void string2FileNotSafe(final String str, final String fileName) throws IOException {
+ File file = new File(fileName);
+ File fileParent = file.getParentFile();
+ if (fileParent != null) {
+ fileParent.mkdirs();
+ }
+ FileWriter fileWriter = null;
+
+ try {
+ fileWriter = new FileWriter(file);
+ fileWriter.write(str);
+ } catch (IOException e) {
+ throw e;
+ } finally {
+ if (fileWriter != null) {
+ fileWriter.close();
+ }
+ }
+ }
+
+ public static String file2String(final String fileName) throws IOException {
+ File file = new File(fileName);
+ return file2String(file);
+ }
+
+ public static String file2String(final File file) throws IOException {
+ if (file.exists()) {
+ byte[] data = new byte[(int) file.length()];
+ boolean result;
+
+ FileInputStream inputStream = null;
+ try {
+ inputStream = new FileInputStream(file);
+ int len = inputStream.read(data);
+ result = len == data.length;
+ } finally {
+ if (inputStream != null) {
+ inputStream.close();
+ }
+ }
+
+ if (result) {
+ return new String(data);
+ }
+ }
+ return null;
+ }
+
+ public static void properties2Object(final Properties p, final Object object) {
+ Method[] methods = object.getClass().getMethods();
+ for (Method method : methods) {
+ String mn = method.getName();
+ if (mn.startsWith("set")) {
+ try {
+ String tmp = mn.substring(4);
+ String first = mn.substring(3, 4);
+
+ String key = first.toLowerCase() + tmp;
+ String property = p.getProperty(key);
+ if (property != null) {
+ Class<?>[] pt = method.getParameterTypes();
+ if (pt != null && pt.length > 0) {
+ String cn = pt[0].getSimpleName();
+ Object arg;
+ if ("int".equals(cn) || "Integer".equals(cn)) {
+ arg = Integer.parseInt(property);
+ } else if ("long".equals(cn) || "Long".equals(cn)) {
+ arg = Long.parseLong(property);
+ } else if ("double".equals(cn) || "Double".equals(cn)) {
+ arg = Double.parseDouble(property);
+ } else if ("boolean".equals(cn) || "Boolean".equals(cn)) {
+ arg = Boolean.parseBoolean(property);
+ } else if ("float".equals(cn) || "Float".equals(cn)) {
+ arg = Float.parseFloat(property);
+ } else if ("String".equals(cn)) {
+ arg = property;
+ } else {
+ continue;
+ }
+ method.invoke(object, arg);
+ }
+ }
+ } catch (Throwable ignored) {
+ }
+ }
+ }
+ }
+}
+
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/FilePathConfigUtil.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/FilePathConfigUtil.java
new file mode 100644
index 0000000..9156a99
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/FilePathConfigUtil.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.common;
+
+import java.io.File;
+
+public class FilePathConfigUtil {
+
+ public static String getConnectorConfigPath(final String rootDir) {
+ return rootDir + File.separator + "config" + File.separator + "connectorConfig.json";
+ }
+
+ public static String getTaskConfigPath(final String rootDir) {
+ return rootDir + File.separator + "config" + File.separator + "taskConfig.json";
+ }
+
+ public static String getPositionPath(final String rootDir) {
+ return rootDir + File.separator + "config" + File.separator + "position.json";
+ }
+
+ public static String getOffsetPath(final String rootDir) {
+ return rootDir + File.separator + "config" + File.separator + "offset.json";
+ }
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/LoggerName.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/LoggerName.java
new file mode 100644
index 0000000..aa1b5d9
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/LoggerName.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.common;
+
+/**
+ * Define all the logger name of the runtime.
+ */
+public class LoggerName {
+ public static final String EventBridge_RUNTIMER = "EventBridgeRuntimer";
+ public static final String EventBus_Listener = "EventBusListener";
+ public static final String EventRule_Transfer = "EventRuleTransfer";
+ public static final String EventTarget_Pusher = "EventTargetPusher";
+}
diff --git a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/context/RocketMQConnectTargetRunnerContext.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/QueueState.java
similarity index 77%
copy from adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/context/RocketMQConnectTargetRunnerContext.java
copy to adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/QueueState.java
index eaf5b6c..9c00e53 100644
--- a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/context/RocketMQConnectTargetRunnerContext.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/QueueState.java
@@ -13,16 +13,11 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
+ *
*/
-package org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.context;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-
-@AllArgsConstructor
-public @Data class RocketMQConnectTargetRunnerContext {
-
- private String connectorName;
+package org.apache.rocketmq.eventbridge.adapter.runtimer.common;
+public enum QueueState {
+ PAUSE;
}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/RuntimerState.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/RuntimerState.java
new file mode 100644
index 0000000..daffff1
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/RuntimerState.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.common;
+
+/**
+ * 运行器状态
+ */
+public enum RuntimerState {
+ START,
+ STOP,
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/ServiceThread.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/ServiceThread.java
new file mode 100644
index 0000000..33f4d9d
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/ServiceThread.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.common;
+
+import org.apache.rocketmq.common.CountDownLatch2;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public abstract class ServiceThread implements Runnable {
+
+ private static final Logger logger = LoggerFactory.getLogger(LoggerName.EventBridge_RUNTIMER);
+
+ private static final long JOIN_TIME = 90 * 1000;
+
+ protected final Thread thread;
+ protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
+ protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);
+ protected volatile boolean stopped = false;
+
+ public ServiceThread() {
+ this.thread = new Thread(this, this.getServiceName());
+ }
+
+ public abstract String getServiceName();
+
+ public void start() {
+ this.thread.start();
+ }
+
+ public void shutdown() {
+ this.shutdown(false);
+ }
+
+ public void shutdown(final boolean interrupt) {
+ this.stopped = true;
+ logger.info("shutdown thread " + this.getServiceName() + " interrupt " + interrupt);
+
+ if (hasNotified.compareAndSet(false, true)) {
+ waitPoint.countDown(); // notify
+ }
+
+ try {
+ if (interrupt) {
+ this.thread.interrupt();
+ }
+
+ long beginTime = System.currentTimeMillis();
+ if (!this.thread.isDaemon()) {
+ this.thread.join(this.getJointime());
+ }
+ long eclipseTime = System.currentTimeMillis() - beginTime;
+ logger.info("join thread " + this.getServiceName() + " eclipse time(ms) " + eclipseTime + " "
+ + this.getJointime());
+ } catch (InterruptedException e) {
+ logger.error("Interrupted", e);
+ }
+ }
+
+ public long getJointime() {
+ return JOIN_TIME;
+ }
+
+ public void stop() {
+ this.stop(false);
+ }
+
+ public void stop(final boolean interrupt) {
+ this.stopped = true;
+ logger.info("stop thread " + this.getServiceName() + " interrupt " + interrupt);
+
+ if (hasNotified.compareAndSet(false, true)) {
+ waitPoint.countDown(); // notify
+ }
+
+ if (interrupt) {
+ this.thread.interrupt();
+ }
+ }
+
+ public void makeStop() {
+ this.stopped = true;
+ logger.info("makestop thread " + this.getServiceName());
+ }
+
+ public void wakeup() {
+ if (hasNotified.compareAndSet(false, true)) {
+ waitPoint.countDown(); // notify
+ }
+ }
+
+ protected void waitForRunning(long interval) {
+ if (hasNotified.compareAndSet(true, false)) {
+ this.onWaitEnd();
+ return;
+ }
+
+ //entry to wait
+ waitPoint.reset();
+
+ try {
+ waitPoint.await(interval, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ } finally {
+ hasNotified.set(false);
+ this.onWaitEnd();
+ }
+ }
+
+ protected void onWaitEnd() {
+ }
+
+ public boolean isStopped() {
+ return stopped;
+ }
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/PusherTargetEntity.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/PusherTargetEntity.java
new file mode 100644
index 0000000..dc7ced4
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/PusherTargetEntity.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * pusher target key config
+ */
+@Data
+public class PusherTargetEntity implements Serializable {
+
+ private String connectName;
+
+ private List<TargetKeyValue> targetKeyValues;
+
+ @Override
+ public boolean equals(Object object){
+ if (object != null && object.getClass() == this.getClass()) {
+ PusherTargetEntity targetEntity = (PusherTargetEntity) object;
+ return this.connectName.equals(targetEntity.getConnectName())
+ && this.targetKeyValues.size() == targetEntity.getTargetKeyValues().size()
+ && this.targetKeyValues.containsAll(targetEntity.getTargetKeyValues());
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode(){
+ return this.connectName.hashCode() + this.targetKeyValues.hashCode();
+ }
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetKeyValue.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetKeyValue.java
new file mode 100644
index 0000000..c3c6a07
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetKeyValue.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity;
+
+import io.openmessaging.KeyValue;
+import org.springframework.util.IdGenerator;
+import org.springframework.util.SimpleIdGenerator;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Default Implements of {@link KeyValue} for runtimer, which can be parsed by fastJson.
+ */
+public class TargetKeyValue implements KeyValue, Serializable {
+
+ /**
+ * unique id
+ */
+ private String targetKeyId;
+
+ /**
+ * All data are reserved in this map.
+ */
+ private Map<String, String> properties;
+
+ public TargetKeyValue() {
+ properties = new ConcurrentHashMap<>();
+ targetKeyId = UUID.randomUUID().toString();
+ }
+
+ @Override
+ public KeyValue put(String key, int value) {
+ properties.put(key, String.valueOf(value));
+ return this;
+ }
+
+ @Override
+ public KeyValue put(String key, long value) {
+ properties.put(key, String.valueOf(value));
+ return this;
+ }
+
+ @Override
+ public KeyValue put(String key, double value) {
+ properties.put(key, String.valueOf(value));
+ return this;
+ }
+
+ @Override
+ public KeyValue put(String key, String value) {
+ properties.put(key, String.valueOf(value));
+ return this;
+ }
+
+ @Override
+ public int getInt(String key) {
+ if (!properties.containsKey(key))
+ return 0;
+ return Integer.valueOf(properties.get(key));
+ }
+
+ @Override
+ public int getInt(final String key, final int defaultValue) {
+ return properties.containsKey(key) ? getInt(key) : defaultValue;
+ }
+
+ @Override
+ public long getLong(String key) {
+ if (!properties.containsKey(key))
+ return 0;
+ return Long.valueOf(properties.get(key));
+ }
+
+ @Override
+ public long getLong(final String key, final long defaultValue) {
+ return properties.containsKey(key) ? getLong(key) : defaultValue;
+ }
+
+ @Override
+ public double getDouble(String key) {
+ if (!properties.containsKey(key))
+ return 0;
+ return Double.valueOf(properties.get(key));
+ }
+
+ @Override
+ public double getDouble(final String key, final double defaultValue) {
+ return properties.containsKey(key) ? getDouble(key) : defaultValue;
+ }
+
+ @Override
+ public String getString(String key) {
+ return properties.get(key);
+ }
+
+ @Override
+ public String getString(final String key, final String defaultValue) {
+ return properties.containsKey(key) ? getString(key) : defaultValue;
+ }
+
+ @Override
+ public Set<String> keySet() {
+ return properties.keySet();
+ }
+
+ @Override
+ public boolean containsKey(String key) {
+ return properties.containsKey(key);
+ }
+
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ public void setProperties(Map<String, String> properties) {
+ this.properties = properties;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+
+ if (obj != null && obj.getClass() == this.getClass()) {
+ TargetKeyValue keyValue = (TargetKeyValue) obj;
+ return this.properties.equals(keyValue.getProperties());
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return properties.hashCode();
+ }
+
+ @Override public String toString() {
+ return "ConnectKeyValue{" +
+ "properties=" + properties +
+ '}';
+ }
+
+ public String getTargetKeyId() {
+ return targetKeyId;
+ }
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/plugin/Plugin.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/plugin/Plugin.java
new file mode 100644
index 0000000..89a3b99
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/plugin/Plugin.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin;
+
+import io.openmessaging.connector.api.component.Transform;
+import io.openmessaging.connector.api.component.connector.Connector;
+import io.openmessaging.connector.api.component.task.Task;
+import org.apache.commons.lang3.StringUtils;
+import org.reflections.Configuration;
+import org.reflections.Reflections;
+import org.reflections.ReflectionsException;
+import org.reflections.scanners.SubTypesScanner;
+import org.reflections.util.ConfigurationBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.*;
+
+@Component
+public class Plugin extends URLClassLoader {
+ private static final Logger log = LoggerFactory.getLogger(Plugin.class);
+
+ @Value("${runtimer.pluginpath:}")
+ private String pluginPath;
+
+ private Map<String, PluginWrapper> classLoaderMap = new HashMap<>();
+
+ public Plugin() {
+ super(new URL[0], Plugin.class.getClassLoader());
+ }
+
+ @PostConstruct
+ public void initPlugin() {
+ List<String> pluginPaths = initPluginPath(this.pluginPath);
+ for (String configPath : pluginPaths) {
+ loadPlugin(configPath);
+ }
+ }
+
+ private List<String> initPluginPath(String plugin){
+ List<String> pluginPaths = new ArrayList<>();
+ if (StringUtils.isNotEmpty(plugin)) {
+ String[] strArr = plugin.split(",");
+ for (String path : strArr) {
+ if (StringUtils.isNotEmpty(path)) {
+ pluginPaths.add(path);
+ }
+ }
+ }
+ return pluginPaths;
+ }
+
+ private void loadPlugin(String path) {
+ Path pluginPath = Paths.get(path).toAbsolutePath();
+ path = pluginPath.toString();
+ try {
+ if (Files.isDirectory(pluginPath)) {
+ for (Path pluginLocation : PluginUtils.pluginLocations(pluginPath)) {
+ registerPlugin(pluginLocation);
+ }
+ } else if (PluginUtils.isArchive(pluginPath)) {
+ registerPlugin(pluginPath);
+ }
+ } catch (IOException e) {
+ log.error("register plugin error, path: {}, e: {}", path, e);
+ }
+ }
+
+ private void doLoad(
+ ClassLoader loader,
+ URL[] urls
+ ) {
+ ConfigurationBuilder builder = new ConfigurationBuilder();
+ builder.setClassLoaders(new ClassLoader[] {loader});
+ builder.addUrls(urls);
+ builder.setScanners(new SubTypesScanner());
+ builder.useParallelExecutor();
+ Reflections reflections = new PluginReflections(builder);
+ getPlugin(reflections, Connector.class, loader);
+ getPlugin(reflections, Task.class, loader);
+ getPlugin(reflections, Transform.class, loader);
+ }
+
+ private <T> Collection<Class<? extends T>> getPlugin(
+ Reflections reflections,
+ Class<T> klass,
+ ClassLoader loader
+ ) {
+ Set<Class<? extends T>> plugins = reflections.getSubTypesOf(klass);
+ Collection<Class<? extends T>> result = new ArrayList<>();
+ for (Class<? extends T> plugin : plugins) {
+ classLoaderMap.put(plugin.getName(), new PluginWrapper(plugin, loader));
+ result.add(plugin);
+
+ }
+ return result;
+ }
+
+ private static class PluginReflections extends Reflections {
+
+ public PluginReflections(Configuration configuration) {
+ super(configuration);
+ }
+
+ @Override
+ protected void scan(URL url) {
+ try {
+ super.scan(url);
+ } catch (ReflectionsException e) {
+ Logger log = Reflections.log;
+ if (log != null && log.isWarnEnabled()) {
+ log.warn("Scan url error. ignoring the exception and continuing", e);
+ }
+ }
+ }
+ }
+
+ private static PluginClassLoader newPluginClassLoader(
+ final URL pluginLocation,
+ final URL[] urls,
+ final ClassLoader parent
+ ) {
+ return AccessController.doPrivileged(
+ (PrivilegedAction<PluginClassLoader>) () -> new PluginClassLoader(pluginLocation, urls, parent)
+ );
+ }
+
+ private void registerPlugin(Path pluginLocation)
+ throws IOException {
+ log.info("Loading plugin from: {}", pluginLocation);
+ List<URL> pluginUrls = new ArrayList<>();
+ for (Path path : PluginUtils.pluginUrls(pluginLocation)) {
+ pluginUrls.add(path.toUri().toURL());
+ }
+ URL[] urls = pluginUrls.toArray(new URL[0]);
+ if (log.isDebugEnabled()) {
+ log.debug("Loading plugin urls: {}", Arrays.toString(urls));
+ }
+ PluginClassLoader loader = newPluginClassLoader(
+ pluginLocation.toUri().toURL(),
+ urls,
+ this
+ );
+ doLoad(loader, urls);
+ }
+
+ public ClassLoader getPluginClassLoader(String pluginName) {
+ PluginWrapper pluginWrapper = classLoaderMap.get(pluginName);
+ if (null != pluginWrapper) {
+ return pluginWrapper.getClassLoader();
+ }
+ return null;
+ }
+
+ public ClassLoader currentThreadLoader() {
+ return Thread.currentThread().getContextClassLoader();
+ }
+
+ public static ClassLoader compareAndSwapLoaders(ClassLoader loader) {
+ ClassLoader current = Thread.currentThread().getContextClassLoader();
+ if (null != current && !current.equals(loader)) {
+ Thread.currentThread().setContextClassLoader(loader);
+ }
+ return current;
+ }
+
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/plugin/PluginClassLoader.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/plugin/PluginClassLoader.java
new file mode 100644
index 0000000..428351e
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/plugin/PluginClassLoader.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.net.URLClassLoader;
+
+public class PluginClassLoader extends URLClassLoader {
+ private static final Logger log = LoggerFactory.getLogger(PluginClassLoader.class);
+ private final URL pluginLocation;
+
+ static {
+ ClassLoader.registerAsParallelCapable();
+ }
+
+ public PluginClassLoader(URL pluginLocation, URL[] urls, ClassLoader parent) {
+ super(urls, parent);
+ this.pluginLocation = pluginLocation;
+ }
+
+ public PluginClassLoader(URL pluginLocation, URL[] urls) {
+ super(urls);
+ this.pluginLocation = pluginLocation;
+ }
+
+ public String location() {
+ return pluginLocation.toString();
+ }
+
+ @Override
+ public String toString() {
+ return "PluginClassLoader{pluginLocation=" + pluginLocation + "}";
+ }
+
+ @Override
+ public synchronized Class<?> loadClass(String name, boolean resolve)
+ throws ClassNotFoundException {
+ synchronized (getClassLoadingLock(name)) {
+ Class<?> klass = findLoadedClass(name);
+ if (klass == null) {
+ try {
+ if (!PluginUtils.shouldNotLoadInIsolation(name)) {
+ klass = findClass(name);
+ }
+ } catch (ClassNotFoundException e) {
+ log.trace("Class '{}' not found. Delegating to parent", name);
+ }
+ }
+ if (klass == null) {
+ klass = super.loadClass(name, false);
+ }
+ if (resolve) {
+ resolveClass(klass);
+ }
+ return klass;
+ }
+ }
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/plugin/PluginUtils.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/plugin/PluginUtils.java
new file mode 100644
index 0000000..effc821
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/plugin/PluginUtils.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.*;
+import java.util.regex.Pattern;
+
+public class PluginUtils {
+
+ private static final Logger log = LoggerFactory.getLogger(PluginUtils.class);
+
+ // Be specific about javax packages and exclude those existing in Java SE and Java EE libraries.
+ private static final Pattern BLACKLIST = Pattern.compile("^(?:"
+ + "java"
+ + "|javax\\.accessibility"
+ + "|javax\\.activation"
+ + "|javax\\.activity"
+ + "|javax\\.annotation"
+ + "|javax\\.batch\\.api"
+ + "|javax\\.batch\\.operations"
+ + "|javax\\.batch\\.runtime"
+ + "|javax\\.crypto"
+ + "|javax\\.decorator"
+ + "|javax\\.ejb"
+ + "|javax\\.el"
+ + "|javax\\.enterprise\\.concurrent"
+ + "|javax\\.enterprise\\.context"
+ + "|javax\\.enterprise\\.context\\.spi"
+ + "|javax\\.enterprise\\.deploy\\.model"
+ + "|javax\\.enterprise\\.deploy\\.shared"
+ + "|javax\\.enterprise\\.deploy\\.spi"
+ + "|javax\\.enterprise\\.event"
+ + "|javax\\.enterprise\\.inject"
+ + "|javax\\.enterprise\\.inject\\.spi"
+ + "|javax\\.enterprise\\.util"
+ + "|javax\\.faces"
+ + "|javax\\.imageio"
+ + "|javax\\.inject"
+ + "|javax\\.interceptor"
+ + "|javax\\.jms"
+ + "|javax\\.json"
+ + "|javax\\.jws"
+ + "|javax\\.lang\\.model"
+ + "|javax\\.mail"
+ + "|javax\\.management"
+ + "|javax\\.management\\.j2ee"
+ + "|javax\\.naming"
+ + "|javax\\.net"
+ + "|javax\\.persistence"
+ + "|javax\\.print"
+ + "|javax\\.resource"
+ + "|javax\\.rmi"
+ + "|javax\\.script"
+ + "|javax\\.security\\.auth"
+ + "|javax\\.security\\.auth\\.message"
+ + "|javax\\.security\\.cert"
+ + "|javax\\.security\\.jacc"
+ + "|javax\\.security\\.sasl"
+ + "|javax\\.servlet"
+ + "|javax\\.sound\\.midi"
+ + "|javax\\.sound\\.sampled"
+ + "|javax\\.sql"
+ + "|javax\\.swing"
+ + "|javax\\.tools"
+ + "|javax\\.transaction"
+ + "|javax\\.validation"
+ + "|javax\\.websocket"
+ + "|javax\\.ws\\.rs"
+ + "|javax\\.xml"
+ + "|javax\\.xml\\.bind"
+ + "|javax\\.xml\\.registry"
+ + "|javax\\.xml\\.rpc"
+ + "|javax\\.xml\\.soap"
+ + "|javax\\.xml\\.ws"
+ + "|org\\.ietf\\.jgss"
+ + "|org\\.omg\\.CORBA"
+ + "|org\\.omg\\.CosNaming"
+ + "|org\\.omg\\.Dynamic"
+ + "|org\\.omg\\.DynamicAny"
+ + "|org\\.omg\\.IOP"
+ + "|org\\.omg\\.Messaging"
+ + "|org\\.omg\\.PortableInterceptor"
+ + "|org\\.omg\\.PortableServer"
+ + "|org\\.omg\\.SendingContext"
+ + "|org\\.omg\\.stub\\.java\\.rmi"
+ + "|org\\.w3c\\.dom"
+ + "|org\\.xml\\.sax"
+ + "|io\\.openmessaging\\.connector\\.api"
+ + "|org\\.slf4j"
+ + ")\\..*$"
+ + "|io\\.openmessaging\\.KeyValue");
+
+ private static final DirectoryStream.Filter<Path> PLUGIN_PATH_FILTER = new DirectoryStream
+ .Filter<Path>() {
+ @Override
+ public boolean accept(Path path) {
+ return Files.isDirectory(path) || isArchive(path) || isClassFile(path);
+ }
+ };
+
+ public static boolean isArchive(Path path) {
+ String archivePath = path.toString().toLowerCase(Locale.ROOT);
+ return archivePath.endsWith(".jar") || archivePath.endsWith(".zip");
+ }
+
+ public static boolean isClassFile(Path path) {
+ return path.toString().toLowerCase(Locale.ROOT).endsWith(".class");
+ }
+
+ public static List<Path> pluginLocations(Path topPath) throws IOException {
+ List<Path> locations = new ArrayList<>();
+ try (
+ DirectoryStream<Path> listing = Files.newDirectoryStream(
+ topPath,
+ PLUGIN_PATH_FILTER
+ )
+ ) {
+ for (Path dir : listing) {
+ locations.add(dir);
+ }
+ }
+ return locations;
+ }
+
+ public static List<Path> pluginUrls(Path topPath) throws IOException {
+ boolean containsClassFiles = false;
+ Set<Path> archives = new TreeSet<>();
+ LinkedList<DirectoryEntry> dfs = new LinkedList<>();
+ Set<Path> visited = new HashSet<>();
+
+ if (isArchive(topPath)) {
+ return Collections.singletonList(topPath);
+ }
+
+ DirectoryStream<Path> topListing = Files.newDirectoryStream(
+ topPath,
+ PLUGIN_PATH_FILTER
+ );
+ dfs.push(new DirectoryEntry(topListing));
+ visited.add(topPath);
+ try {
+ while (!dfs.isEmpty()) {
+ Iterator<Path> neighbors = dfs.peek().iterator;
+ if (!neighbors.hasNext()) {
+ dfs.pop().stream.close();
+ continue;
+ }
+
+ Path adjacent = neighbors.next();
+ if (Files.isSymbolicLink(adjacent)) {
+ try {
+ Path symlink = Files.readSymbolicLink(adjacent);
+ Path parent = adjacent.getParent();
+ if (parent == null) {
+ continue;
+ }
+ Path absolute = parent.resolve(symlink).toRealPath();
+ if (Files.exists(absolute)) {
+ adjacent = absolute;
+ } else {
+ continue;
+ }
+ } catch (IOException e) {
+ log.warn(
+ "Resolving symbolic link '{}' failed. Ignoring this path.",
+ adjacent,
+ e
+ );
+ continue;
+ }
+ }
+
+ if (!visited.contains(adjacent)) {
+ visited.add(adjacent);
+ if (isArchive(adjacent)) {
+ archives.add(adjacent);
+ } else if (isClassFile(adjacent)) {
+ containsClassFiles = true;
+ } else {
+ DirectoryStream<Path> listing = Files.newDirectoryStream(
+ adjacent,
+ PLUGIN_PATH_FILTER
+ );
+ dfs.push(new DirectoryEntry(listing));
+ }
+ }
+ }
+ } finally {
+ while (!dfs.isEmpty()) {
+ dfs.pop().stream.close();
+ }
+ }
+
+ if (containsClassFiles) {
+ if (archives.isEmpty()) {
+ return Collections.singletonList(topPath);
+ }
+ log.warn("Plugin path contains both java archives and class files. Returning only the"
+ + " archives");
+ }
+ return Arrays.asList(archives.toArray(new Path[0]));
+ }
+
+ private static class DirectoryEntry {
+ final DirectoryStream<Path> stream;
+ final Iterator<Path> iterator;
+
+ DirectoryEntry(DirectoryStream<Path> stream) {
+ this.stream = stream;
+ this.iterator = stream.iterator();
+ }
+ }
+
+ public static boolean shouldNotLoadInIsolation(String name) {
+ return BLACKLIST.matcher(name).matches();
+ }
+
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/plugin/PluginWrapper.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/plugin/PluginWrapper.java
new file mode 100644
index 0000000..2acaef6
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/plugin/PluginWrapper.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin;
+
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
+
+import java.util.Locale;
+
+public class PluginWrapper<T> {
+ private final Class<? extends T> klass;
+ private final String name;
+ private final PluginType type;
+ private final String typeName;
+ private final String location;
+ private final ClassLoader classLoader;
+
+ public PluginWrapper(Class<? extends T> klass, ClassLoader loader) {
+ this.klass = klass;
+ this.name = klass.getName();
+ this.type = PluginType.from(klass);
+ this.typeName = type.toString();
+ this.classLoader = loader;
+ this.location = loader instanceof PluginClassLoader
+ ? ((PluginClassLoader) loader).location()
+ : "classpath";
+ }
+
+ public ClassLoader getClassLoader() {
+ return this.classLoader;
+ }
+
+ public enum PluginType {
+ SOURCE(SourceConnector.class),
+ SINK(SinkConnector.class),
+ UNKNOWN(Object.class);
+
+ private Class<?> klass;
+
+ PluginType(Class<?> klass) {
+ this.klass = klass;
+ }
+
+ public static PluginType from(Class<?> klass) {
+ for (PluginType type : PluginType.values()) {
+ if (type.klass.isAssignableFrom(klass)) {
+ return type;
+ }
+ }
+ return UNKNOWN;
+ }
+
+ public String simpleName() {
+ return klass.getSimpleName();
+ }
+
+ @Override
+ public String toString() {
+ return super.toString().toLowerCase(Locale.ROOT);
+ }
+ }
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/store/FileBaseKeyValueStore.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/store/FileBaseKeyValueStore.java
new file mode 100644
index 0000000..61654c8
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/store/FileBaseKeyValueStore.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.common.store;
+
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.connector.api.data.Converter;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.FileAndPropertyUtil;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.LoggerName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * File based Key value store.
+ *
+ * @param <K>
+ * @param <V>
+ */
+public class FileBaseKeyValueStore<K, V> extends MemoryBasedKeyValueStore<K, V> {
+
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.EventBridge_RUNTIMER);
+
+ private String configFilePath;
+ private Converter keyConverter;
+ private Converter valueConverter;
+
+ public FileBaseKeyValueStore(String configFilePath,
+ Converter keyConverter,
+ Converter valueConverter) {
+
+ super();
+ this.configFilePath = configFilePath;
+ this.keyConverter = keyConverter;
+ this.valueConverter = valueConverter;
+ }
+
+ public String encode() {
+
+ Map<String, String> map = new HashMap<>();
+ for (K key : data.keySet()) {
+ byte[] keyByte = keyConverter.objectToByte(key);
+ byte[] valueByte = valueConverter.objectToByte(data.get(key));
+ map.put(Base64.getEncoder().encodeToString(keyByte), Base64.getEncoder().encodeToString(valueByte));
+ }
+ return JSON.toJSONString(map);
+ }
+
+ public void decode(String jsonString) {
+
+ Map<K, V> resultMap = new HashMap<>();
+ Map<String, String> map = JSON.parseObject(jsonString, Map.class);
+ for (String key : map.keySet()) {
+ K decodeKey = (K) keyConverter.byteToObject(Base64.getDecoder().decode(key));
+ V decodeValue = (V) valueConverter.byteToObject(Base64.getDecoder().decode(map.get(key)));
+ resultMap.put(decodeKey, decodeValue);
+ }
+ this.data = resultMap;
+ }
+
+ @Override
+ public boolean load() {
+ String fileName = null;
+ try {
+ fileName = this.configFilePath;
+ String jsonString = FileAndPropertyUtil.file2String(fileName);
+
+ if (null == jsonString || jsonString.length() == 0) {
+ return this.loadBak();
+ } else {
+ this.decode(jsonString);
+ log.info("load " + fileName + " OK");
+ return true;
+ }
+ } catch (Exception e) {
+ log.error("load " + fileName + " failed, and try to load backup file", e);
+ return this.loadBak();
+ }
+ }
+
+ private boolean loadBak() {
+ String fileName = null;
+ try {
+ fileName = this.configFilePath;
+ String jsonString = FileAndPropertyUtil.file2String(fileName + ".bak");
+ if (jsonString != null && jsonString.length() > 0) {
+ this.decode(jsonString);
+ log.info("load " + fileName + " OK");
+ return true;
+ }
+ } catch (Exception e) {
+ log.error("load " + fileName + " Failed", e);
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public void persist() {
+
+ String jsonString = this.encode();
+ if (jsonString != null) {
+ String fileName = this.configFilePath;
+ try {
+ FileAndPropertyUtil.string2File(jsonString, fileName);
+ } catch (IOException e) {
+ log.error("persist file " + fileName + " exception", e);
+ }
+ }
+ }
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/store/KeyValueStore.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/store/KeyValueStore.java
new file mode 100644
index 0000000..d136589
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/store/KeyValueStore.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.common.store;
+
+import java.util.Map;
+
+/**
+ * Key value based store interface.
+ *
+ * @param <K>
+ * @param <V>
+ */
+public interface KeyValueStore<K, V> {
+
+ /**
+ * Put a key/value into the store.
+ *
+ * @param key
+ * @param value
+ * @return
+ */
+ V put(K key, V value);
+
+ /**
+ * Put a set of key/value into the store.
+ *
+ * @param map
+ */
+ void putAll(Map<K, V> map);
+
+ /**
+ * Remove a specified key.
+ *
+ * @param key
+ * @return
+ */
+ V remove(K key);
+
+ /**
+ * Get the size of current key/value store.
+ *
+ * @return
+ */
+ int size();
+
+ /**
+ * Whether a key is contained in current store.
+ *
+ * @param key
+ * @return
+ */
+ boolean containsKey(K key);
+
+ /**
+ * Get the value of a key.
+ *
+ * @param key
+ * @return
+ */
+ V get(K key);
+
+ /**
+ * Get all data from the current store. Not recommend to use this method when the data set is large.
+ *
+ * @return
+ */
+ Map<K, V> getKVMap();
+
+ /**
+ * Load the data from back store.
+ *
+ * @return
+ */
+ boolean load();
+
+ /**
+ * Persist all data into the store.
+ */
+ void persist();
+
+
+ Stage getStage();
+
+
+ enum Stage {
+ CONNECTOR,
+ TASK,
+ POSITION,
+ UNIVERSAL
+ }
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/store/MemoryBasedKeyValueStore.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/store/MemoryBasedKeyValueStore.java
new file mode 100644
index 0000000..06c5258
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/store/MemoryBasedKeyValueStore.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.common.store;
+
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class MemoryBasedKeyValueStore<K, V> implements org.apache.rocketmq.eventbridge.adapter.runtimer.common.store.KeyValueStore<K, V> {
+
+ protected Map<K, V> data;
+
+ public MemoryBasedKeyValueStore() {
+ this.data = new ConcurrentHashMap<>();
+ }
+
+ @Override
+ public V put(K key, V value) {
+ return this.data.put(key, value);
+ }
+
+ @Override
+ public void putAll(Map<K, V> map) {
+ data.putAll(map);
+ }
+
+ @Override
+ public V remove(K key) {
+ return this.data.remove(key);
+ }
+
+ @Override
+ public int size() {
+ return this.data.size();
+ }
+
+ @Override
+ public boolean containsKey(K key) {
+ return this.data.containsKey(key);
+ }
+
+ @Override
+ public V get(K key) {
+ return this.data.get(key);
+ }
+
+ @Override
+ public Map<K, V> getKVMap() {
+ return this.data;
+ }
+
+ @Override
+ public boolean load() {
+ return true;
+ }
+
+ @Override
+ public void persist() {
+
+ }
+
+ @Override
+ public Stage getStage() {
+ return Stage.UNIVERSAL;
+ }
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimeConfigDefine.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimeConfigDefine.java
new file mode 100644
index 0000000..3a208d2
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimeConfigDefine.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.config;
+
+import io.openmessaging.connector.api.data.ConnectRecord;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Define keys for connector and task configs.
+ */
+public class RuntimeConfigDefine {
+
+ /**
+ * The full class name of a specific connector implements.
+ */
+ public static final String CONNECTOR_CLASS = "connector-class";
+
+ public static final String CONNECTOR_DIRECT_ENABLE = "connector-direct-enable";
+
+ public static final String TASK_CLASS = "task-class";
+
+ public static final String TASK_ID = "task-id";
+
+ public static final String TASK_TYPE = "task-type";
+
+ public static final String SOURCE_TASK_CLASS = "source-task-class";
+
+ public static final String SINK_TASK_CLASS = "sink-task-class";
+
+ public static final String MAX_TASK = "max-task";
+
+ public static final String CONNECTOR_ID = "connector-id";
+
+ /**
+ * Last updated time of the configuration.
+ */
+ public static final String UPDATE_TIMESTAMP = "update-timestamp";
+
+ /**
+ * Whether the current config is deleted.
+ */
+ public static final String CONFIG_DELETED = "config-deleted";
+
+ /**
+ * The full class name of record converter. Which is used to parse {@link ConnectRecord} to/from byte[].
+ */
+ public static final String SOURCE_RECORD_CONVERTER = "source-record-converter";
+
+ public static final String NAMESRV_ADDR = "namesrv-addr";
+
+ public static final String RMQ_PRODUCER_GROUP = "rmq-producer-group";
+
+ public static final String RMQ_CONSUMNER_GROUP = "rmq-consumer-group";
+
+ public static final String OPERATION_TIMEOUT = "operation-timeout";
+
+ public static final String HASH_FUNC = "consistentHashFunc";
+
+ public static final String VIRTUAL_NODE = "virtualNode";
+
+ public static final String CONNECT_SHARDINGKEY = "connect-shardingkey";
+
+ public static final String CONNECT_TOPICNAME = "connect-topicname";
+
+ public static final String CONNECT_TOPICNAMES = "connect-topicnames";
+
+ public static final String CONNECT_SOURCE_PARTITION = "connect-source-partition";
+
+ public static final String CONNECT_SOURCE_POSITION = "connect-source-position";
+
+ public static final String CONNECT_ENTRYTYPE = "connect-entrytype";
+
+ public static final String CONNECT_TIMESTAMP = "connect-timestamp";
+
+ public static final String CONNECT_SCHEMA = "connect-schema";
+
+ public static final String TRANSFORMS = "transforms";
+
+ /**
+ * The required key for all configurations.
+ */
+ public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
+ {
+ add(CONNECTOR_CLASS);
+ add(CONNECT_TOPICNAME);
+ }
+ };
+
+ /**
+ * Maximum allowed message size in bytes, the default vaule is 4M.
+ */
+ public static final int MAX_MESSAGE_SIZE = Integer.parseInt(System.getProperty("rocketmq.runtime.max.message.size", "4194304"));
+
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfig.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfig.java
new file mode 100644
index 0000000..af4cafa
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfig.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.config;
+
+import lombok.Data;
+import org.springframework.beans.factory.annotation.Configurable;
+import org.springframework.beans.factory.annotation.Value;
+
+@Data
+@Configurable
+public class RuntimerConfig {
+
+ @Value("${rumtimer.name:}")
+ private String runtimeName;
+
+ @Value("${runtimer.pluginpath:}")
+ private String pluginPath;
+
+ @Value("${runtimer.storePathRootDir:}")
+ private String storePathRootDir;
+
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/TraceConfig.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/TraceConfig.java
new file mode 100644
index 0000000..d8b0dbb
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/TraceConfig.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.config;
+
+public class TraceConfig {
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/converter/JsonConverter.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/converter/JsonConverter.java
new file mode 100644
index 0000000..efc7d39
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/converter/JsonConverter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.converter;
+
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.connector.api.data.Converter;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.LoggerName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+
+/**
+ * Use fastJson to convert object to byte[].
+ */
+public class JsonConverter implements Converter {
+
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.EventBridge_RUNTIMER);
+
+ private Class clazz;
+
+ public JsonConverter() {
+ this.clazz = null;
+ }
+
+ public JsonConverter(Class clazz) {
+ this.clazz = clazz;
+ }
+
+ public void setClazz(Class clazz) {
+ this.clazz = clazz;
+ }
+
+ @Override
+ public byte[] objectToByte(Object object) {
+ try {
+ String json = JSON.toJSONString(object);
+ return json.getBytes("UTF-8");
+ } catch (Exception e) {
+ log.error("JsonConverter#objectToByte failed", e);
+ }
+ return new byte[0];
+ }
+
+ @Override
+ public Object byteToObject(byte[] bytes) {
+ try {
+ String text = new String(bytes, "UTF-8");
+
+ Object res;
+ if (clazz != null) {
+ res = JSON.parseObject(text, clazz);
+ } else {
+ res = JSON.parse(text);
+ }
+ return res;
+ } catch (UnsupportedEncodingException e) {
+ log.error("JsonConverter#byteToObject failed", e);
+ }
+ return null;
+ }
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/converter/ListConverter.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/converter/ListConverter.java
new file mode 100644
index 0000000..3ee9929
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/converter/ListConverter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.converter;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import io.openmessaging.connector.api.data.Converter;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.LoggerName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+
+/**
+ * Convert between a list and byte[].
+ */
+public class ListConverter implements Converter<List> {
+
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.EventBridge_RUNTIMER);
+
+ private Class clazz;
+
+ public ListConverter(Class clazz) {
+ this.clazz = clazz;
+ }
+
+ @Override
+ public byte[] objectToByte(List list) {
+ try {
+ return JSON.toJSONString(list).getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ log.error("ListConverter#objectToByte failed", e);
+ }
+ return null;
+ }
+
+ @Override
+ public List byteToObject(byte[] bytes) {
+ try {
+ String json = new String(bytes, "UTF-8");
+ List list = JSONArray.parseArray(json, clazz);
+ return list;
+ } catch (UnsupportedEncodingException e) {
+ log.error("ListConverter#byteToObject failed", e);
+ }
+ return null;
+ }
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/converter/RecordOffsetConverter.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/converter/RecordOffsetConverter.java
new file mode 100644
index 0000000..f039d9c
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/converter/RecordOffsetConverter.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.converter;
+
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.connector.api.data.Converter;
+import io.openmessaging.connector.api.data.RecordOffset;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.LoggerName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+
+/**
+ * record offset converter.
+ */
+public class RecordOffsetConverter implements Converter<RecordOffset> {
+
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.EventBridge_RUNTIMER);
+
+ @Override
+ public byte[] objectToByte(RecordOffset recordOffset) {
+ try {
+ String json = JSON.toJSONString(recordOffset);
+ return json.getBytes("UTF-8");
+ } catch (Exception e) {
+ log.error("JsonConverter#objectToByte failed", e);
+ }
+ return new byte[0];
+ }
+
+ @Override
+ public RecordOffset byteToObject(byte[] bytes) {
+ try {
+ String text = new String(bytes, "UTF-8");
+ RecordOffset res = JSON.parseObject(text, RecordOffset.class);
+ return res;
+ } catch (UnsupportedEncodingException e) {
+ log.error("JsonConverter#byteToObject failed", e);
+ }
+ return null;
+ }
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageService.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageService.java
new file mode 100644
index 0000000..88a2001
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageService.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
+
+import io.openmessaging.connector.api.component.connector.Connector;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * manage the pusher connector/task config info
+ */
+public interface PusherConfigManageService {
+
+ /**
+ * Get all connector configs
+ *
+ * @return
+ */
+ Map<String, TargetKeyValue> getConnectorConfigs();
+
+ /**
+ * Put the configs.
+ *
+ * @param connectorName
+ * @param configs
+ * @return
+ * @throws Exception
+ */
+ String putConnectTargetConfig(String connectorName, TargetKeyValue configs) throws Exception;
+
+ /**
+ * Remove the connector
+ *
+ * @param connectorName
+ */
+ void removeConnectorConfig(String connectorName);
+
+ void recomputeTaskConfigs(String connectorName, Connector connector, Long currentTimestamp, TargetKeyValue configs);
+
+ /**
+ * Get all Task configs.
+ *
+ * @return
+ */
+ Map<String, List<TargetKeyValue>> getTaskConfigs();
+
+ /**
+ * get target info
+ * @return
+ */
+ Set<PusherTargetEntity> getTargetInfo();
+
+ /**
+ * Get all topics
+ * @return
+ */
+ List<String> getConnectTopics();
+
+ /**
+ * Persist all the configs in a store.
+ */
+ void persist();
+
+ /**
+ * Register a listener to listen all config update operations.
+ *
+ * @param listener
+ */
+ void registerListener(TargetConfigUpdateListener listener);
+
+ interface TargetConfigUpdateListener {
+
+ /**
+ * Invoke while connector config changed.
+ */
+ void onConfigUpdate(PusherTargetEntity targetEntity);
+ }
+
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageServiceImpl.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageServiceImpl.java
new file mode 100644
index 0000000..1dde731
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageServiceImpl.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
+
+import com.google.common.collect.Lists;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.connector.Connector;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.FilePathConfigUtil;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.store.FileBaseKeyValueStore;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.store.KeyValueStore;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimeConfigDefine;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.converter.JsonConverter;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.converter.ListConverter;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
+
+import javax.annotation.PostConstruct;
+import java.util.*;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+@Service
+public class PusherConfigManageServiceImpl implements PusherConfigManageService {
+
+ /**
+ * plugin for recognize class loader
+ */
+ private Plugin plugin;
+
+ /**
+ * Current connector configs in the store.
+ */
+ private KeyValueStore<String, TargetKeyValue> connectorKeyValueStore;
+
+ /**
+ * Current task configs in the store.
+ */
+ private KeyValueStore<String, List<TargetKeyValue>> taskKeyValueStore;
+
+ /**
+ * All listeners to trigger while config change.
+ */
+ private Set<TargetConfigUpdateListener> targetConfigUpdateListeners;
+
+ private Set<String> connectTopicNames;
+
+ @Value("${runtimer.storePathRootDir:}")
+ private String storeRootPath;
+
+ public PusherConfigManageServiceImpl(Plugin plugin){
+ this.plugin = plugin;
+ this.connectTopicNames = new CopyOnWriteArraySet<>();
+ this.targetConfigUpdateListeners = new HashSet<>();
+ }
+
+ @PostConstruct
+ public void initStoreKeyValue(){
+ this.connectorKeyValueStore = new FileBaseKeyValueStore<>(
+ FilePathConfigUtil.getConnectorConfigPath(this.storeRootPath),
+ new JsonConverter(),
+ new JsonConverter(TargetKeyValue.class));
+ this.taskKeyValueStore = new FileBaseKeyValueStore<>(
+ FilePathConfigUtil.getTaskConfigPath(this.storeRootPath),
+ new JsonConverter(),
+ new ListConverter(TargetKeyValue.class));
+ this.connectorKeyValueStore.load();
+ this.taskKeyValueStore.load();
+ }
+
+ /**
+ * get all connector configs enabled
+ *
+ * @return
+ */
+ @Override
+ public Map<String, TargetKeyValue> getConnectorConfigs() {
+ Map<String, TargetKeyValue> result = new HashMap<>();
+ Map<String, TargetKeyValue> connectorConfigs = connectorKeyValueStore.getKVMap();
+ for (String connectorName : connectorConfigs.keySet()) {
+ TargetKeyValue config = connectorConfigs.get(connectorName);
+ if (0 != config.getInt(RuntimeConfigDefine.CONFIG_DELETED)) {
+ continue;
+ }
+ result.put(connectorName, config);
+ }
+ return result;
+ }
+
+ @Override
+ public String putConnectTargetConfig(String connectorName, TargetKeyValue configs) throws Exception {
+ TargetKeyValue exist = connectorKeyValueStore.get(connectorName);
+ if (null != exist) {
+ Long updateTimestamp = exist.getLong(RuntimeConfigDefine.UPDATE_TIMESTAMP);
+ if (null != updateTimestamp) {
+ configs.put(RuntimeConfigDefine.UPDATE_TIMESTAMP, updateTimestamp);
+ }
+ }
+ if (configs.equals(exist)) {
+ return "Connector with same config already exist.";
+ }
+
+ Long currentTimestamp = System.currentTimeMillis();
+ configs.put(RuntimeConfigDefine.UPDATE_TIMESTAMP, currentTimestamp);
+ for (String requireConfig : RuntimeConfigDefine.REQUEST_CONFIG) {
+ if (!configs.containsKey(requireConfig)) {
+ return "Request config key: " + requireConfig;
+ }
+ }
+
+ String connectorClass = configs.getString(RuntimeConfigDefine.CONNECTOR_CLASS);
+ ClassLoader classLoader = plugin.getPluginClassLoader(connectorClass);
+ Class clazz;
+ if (null != classLoader) {
+ clazz = Class.forName(connectorClass, true, classLoader);
+ } else {
+ clazz = Class.forName(connectorClass);
+ }
+ final Connector connector = (Connector) clazz.getDeclaredConstructor().newInstance();
+ connector.validate(configs);
+ connector.init(configs);
+ connectorKeyValueStore.put(connectorName, configs);
+ recomputeTaskConfigs(connectorName, connector, currentTimestamp, configs);
+ return "";
+ }
+
+ @Override
+ public void recomputeTaskConfigs(String connectorName, Connector connector, Long currentTimestamp, TargetKeyValue configs) {
+ int maxTask = configs.getInt(RuntimeConfigDefine.MAX_TASK, 1);
+ List<KeyValue> taskConfigs = connector.taskConfigs(maxTask);
+ List<TargetKeyValue> converterdConfigs = new ArrayList<>();
+ for (KeyValue keyValue : taskConfigs) {
+ TargetKeyValue newKeyValue = new TargetKeyValue();
+ for (String key : keyValue.keySet()) {
+ newKeyValue.put(key, keyValue.getString(key));
+ }
+ newKeyValue.put(RuntimeConfigDefine.TASK_CLASS, connector.taskClass().getName());
+ newKeyValue.put(RuntimeConfigDefine.UPDATE_TIMESTAMP, currentTimestamp);
+
+ newKeyValue.put(RuntimeConfigDefine.CONNECT_TOPICNAME, configs.getString(RuntimeConfigDefine.CONNECT_TOPICNAME));
+ newKeyValue.put(RuntimeConfigDefine.CONNECT_TOPICNAMES, configs.getString(RuntimeConfigDefine.CONNECT_TOPICNAMES));
+ Set<String> connectConfigKeySet = configs.keySet();
+ for (String connectConfigKey : connectConfigKeySet) {
+ if (connectConfigKey.startsWith(RuntimeConfigDefine.TRANSFORMS)) {
+ newKeyValue.put(connectConfigKey, configs.getString(connectConfigKey));
+ }
+ }
+ converterdConfigs.add(newKeyValue);
+ connectTopicNames.add(configs.getString(RuntimeConfigDefine.CONNECT_TOPICNAME));
+ }
+ putTaskConfigs(connectorName, converterdConfigs);
+ }
+
+ @Override
+ public void removeConnectorConfig(String connectorName) {
+ TargetKeyValue config = connectorKeyValueStore.get(connectorName);
+ if(Objects.isNull(config)){
+ return;
+ }
+ config.put(RuntimeConfigDefine.UPDATE_TIMESTAMP, System.currentTimeMillis());
+ config.put(RuntimeConfigDefine.CONFIG_DELETED, 1);
+ List<TargetKeyValue> taskConfigList = taskKeyValueStore.get(connectorName);
+ taskConfigList.add(config);
+ connectorKeyValueStore.put(connectorName, config);
+ putTaskConfigs(connectorName, taskConfigList);
+ }
+
+ @Override
+ public Map<String, List<TargetKeyValue>> getTaskConfigs() {
+ Map<String, List<TargetKeyValue>> result = new HashMap<>();
+ Map<String, List<TargetKeyValue>> taskConfigs = taskKeyValueStore.getKVMap();
+ Map<String, TargetKeyValue> filteredConnector = getConnectorConfigs();
+ for (String connectorName : taskConfigs.keySet()) {
+ if (!filteredConnector.containsKey(connectorName)) {
+ continue;
+ }
+ result.put(connectorName, taskConfigs.get(connectorName));
+ }
+ return result;
+ }
+
+ @Override
+ public Set<PusherTargetEntity> getTargetInfo() {
+ Set<PusherTargetEntity> result = new HashSet<>();
+ Map<String, List<TargetKeyValue>> taskConfigs = taskKeyValueStore.getKVMap();
+ Map<String, TargetKeyValue> filteredConnector = getConnectorConfigs();
+ for (String connectorName : taskConfigs.keySet()) {
+ if (!filteredConnector.containsKey(connectorName)) {
+ continue;
+ }
+ PusherTargetEntity targetEntity = new PusherTargetEntity();
+ targetEntity.setConnectName(connectorName);
+ targetEntity.setTargetKeyValues(taskConfigs.get(connectorName));
+ result.add(targetEntity);
+ }
+ return result;
+ }
+
+ @Override
+ public List<String> getConnectTopics(){
+ if(CollectionUtils.isEmpty(connectTopicNames)){
+ return Lists.newArrayList();
+ }
+ return Lists.newArrayList(connectTopicNames);
+ }
+
+ @Override
+ public void persist() {
+ this.connectorKeyValueStore.persist();
+ this.taskKeyValueStore.persist();
+ }
+
+ @Override
+ public void registerListener(TargetConfigUpdateListener listener) {
+ this.targetConfigUpdateListeners.add(listener);
+ }
+
+ /**
+ * put target task key config for update
+ * @param connectorName
+ * @param configs
+ */
+ private void putTaskConfigs(String connectorName, List<TargetKeyValue> configs) {
+ List<TargetKeyValue> exist = taskKeyValueStore.get(connectorName);
+ if (null != exist && exist.size() > 0) {
+ taskKeyValueStore.remove(connectorName);
+ }
+ taskKeyValueStore.put(connectorName, configs);
+ PusherTargetEntity targetEntity = new PusherTargetEntity();
+ targetEntity.setConnectName(connectorName);
+ targetEntity.setTargetKeyValues(configs);
+ triggerListener(targetEntity);
+ persistStore();
+ }
+
+ private void persistStore() {
+
+ }
+
+ /**
+ * trigger new target task config for update
+ * @param pusherTargetEntity
+ */
+ private void triggerListener(PusherTargetEntity pusherTargetEntity) {
+ if (CollectionUtils.isEmpty(this.targetConfigUpdateListeners)) {
+ return;
+ }
+
+ for (TargetConfigUpdateListener listener : this.targetConfigUpdateListeners) {
+ listener.onConfigUpdate(pusherTargetEntity);
+ }
+ }
+
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/trace/JaegerTraceStrategy.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/trace/JaegerTraceStrategy.java
new file mode 100644
index 0000000..e29c09b
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/trace/JaegerTraceStrategy.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.trace;
+
+public class JaegerTraceStrategy {
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/trace/TraceConstants.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/trace/TraceConstants.java
new file mode 100644
index 0000000..7adeafe
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/trace/TraceConstants.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.trace;
+
+public class TraceConstants {
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/trace/TraceException.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/trace/TraceException.java
new file mode 100644
index 0000000..6baab11
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/trace/TraceException.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.trace;
+
+public class TraceException extends Exception{
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/trace/TraceFactory.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/trace/TraceFactory.java
new file mode 100644
index 0000000..f4f3b7d
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/trace/TraceFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.trace;
+
+import java.util.Map;
+
+public class TraceFactory {
+
+ public void createSpan(Map<String,String> content){
+
+ }
+
+ public void finishSpan(Map<String,String> content){
+
+ }
+}
diff --git a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/context/RocketMQConnectTargetRunnerContext.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/trace/TraceStrategy.java
similarity index 70%
copy from adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/context/RocketMQConnectTargetRunnerContext.java
copy to adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/trace/TraceStrategy.java
index eaf5b6c..4020ac3 100644
--- a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/context/RocketMQConnectTargetRunnerContext.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/trace/TraceStrategy.java
@@ -15,14 +15,19 @@
* limitations under the License.
*/
-package org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.context;
+package org.apache.rocketmq.eventbridge.adapter.runtimer.trace;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-@AllArgsConstructor
-public @Data class RocketMQConnectTargetRunnerContext {
+/**
+ * EventMeshTraceService
+ * SPI可扩展
+ * 基于OpenTelemetry实现封装不同追踪器
+ */
+public interface TraceStrategy {
+
+ void init() throws TraceException;
- private String connectorName;
+ void createSpan(String spanName) throws TraceException;
+ void shutdown() throws TraceException;
}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/trace/ZipkinTraceStrategy.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/trace/ZipkinTraceStrategy.java
new file mode 100644
index 0000000..7c0d6a9
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/trace/ZipkinTraceStrategy.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.trace;
+
+public class ZipkinTraceStrategy implements TraceStrategy {
+ @Override
+ public void init() throws TraceException {
+
+ }
+
+ @Override
+ public void createSpan(String spanName) throws TraceException {
+
+ }
+
+ @Override
+ public void shutdown() throws TraceException {
+
+ }
+}
diff --git a/pom.xml b/pom.xml
index 9245802..95bc7de 100644
--- a/pom.xml
+++ b/pom.xml
@@ -130,6 +130,11 @@
<artifactId>rocketmq-eventbridge-adapter-rpc</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-eventbridge-adapter-runtimer</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- Framework -->
<dependency>
diff --git a/start/pom.xml b/start/pom.xml
index c498c08..fcf4649 100644
--- a/start/pom.xml
+++ b/start/pom.xml
@@ -38,6 +38,10 @@
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-eventbridge-adapter-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-eventbridge-adapter-runtimer</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-eventbridge-domain</artifactId>
diff --git a/start/src/main/resources/application.properties b/start/src/main/resources/application.properties
index 70c8c2a..1394df6 100644
--- a/start/src/main/resources/application.properties
+++ b/start/src/main/resources/application.properties
@@ -17,10 +17,10 @@ server.port=7001
management.server.port=7002
management.endpoints.web.base-path=/
## database
-spring.datasource.url=jdbc:mysql://localhost:3306/rocketmq_eventbridge?useUnicode=true&characterEncoding=utf8&useSSL=false
+spring.datasource.url=jdbc:mysql://localhost:3306/preview_eventbridge?useUnicode=true&characterEncoding=utf8&useSSL=false
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
-spring.datasource.username=******
-spring.datasource.password=******
+spring.datasource.username=root
+spring.datasource.password=Artisan012
mybatis.mapper-locations=classpath:mybatis/*.xml
mybatis.configuration.log-impl=org.apache.ibatis.logging.stdout.StdOutImpl
## flyway
@@ -29,7 +29,12 @@ spring.flyway.placeholderReplacement=false
rocketmq.namesrvAddr=localhost:9876
rocketmq.connect.endpoint=http://127.0.0.1:8082
rocketmq.cluster.name=DefaultCluster
+## runtimer
+rumtimer.name=eventbridge-runtimer
+runtimer.pluginpath=/Users/Local/eventbridge/plugin
+runtimer.storePathRootDir=/Users/Local/eventbridge/store
+
## log
-app.name=rocketmq-eventbridge
+app.name=rocketmqeventbridge
log.level=INFO
-log.path=~
\ No newline at end of file
+log.path=/Users/artisan/logs
\ No newline at end of file