You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by sh...@apache.org on 2023/03/15 13:41:22 UTC
[rocketmq-eventbridge] branch runtimer updated: feat:add TargetRunnerConfigObserver (#58)
This is an automated email from the ASF dual-hosted git repository.
shenlin pushed a commit to branch runtimer
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git
The following commit(s) were added to refs/heads/runtimer by this push:
new deae7f7 feat:add TargetRunnerConfigObserver (#58)
deae7f7 is described below
commit deae7f7b7a163962003f031796b8466a6b0b5ee2
Author: shenlin <20...@gmail.com>
AuthorDate: Wed Mar 15 21:41:15 2023 +0800
feat:add TargetRunnerConfigObserver (#58)
feat: add EventSubscriber & Observer.
---
.licenserc.yaml | 1 +
.../RocketMQConnectTargetRunnerAPIImpl.java | 40 +--
.../eventbridge/adapter/runtimer/Runtimer.java | 62 ++---
.../adapter/runtimer/boot/EventBusListener.java | 158 +-----------
.../adapter/runtimer/boot/EventRuleTransfer.java | 96 ++------
.../adapter/runtimer/boot/EventTargetPusher.java | 91 ++++---
.../runtimer/boot/listener/EventSubscriber.java | 73 ++++++
.../runtimer/boot/listener/ListenerFactory.java | 127 +++++++---
.../boot/listener/RocketMQEventSubscriber.java | 43 ++++
.../boot/listener/TargetRunnerListener.java | 44 ++++
...erTargetEntity.java => TargetRunnerConfig.java} | 43 ++--
.../runtimer/config/RuntimerConfigDefine.java | 2 +
.../AbstractTargetRunnerConfigObserver.java | 95 ++++++++
.../service/PusherConfigManageServiceImpl.java | 271 ---------------------
...ervice.java => TargetRunnerConfigObserver.java} | 69 +-----
.../TargetRunnerConfigOnControllerObserver.java | 48 ++++
.../service/TargetRunnerConfigOnDBObserver.java | 60 +++++
.../service/TargetRunnerConfigOnFileObserver.java | 92 +++++++
.../src/main/resources/target-runner.config | 7 +
19 files changed, 704 insertions(+), 718 deletions(-)
diff --git a/.licenserc.yaml b/.licenserc.yaml
index f6d7678..f01ce66 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -23,6 +23,7 @@ header:
paths-ignore:
- 'dist'
+ - 'config'
- 'licenses'
- '**/*.md'
- 'LICENSE'
diff --git a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/RocketMQConnectTargetRunnerAPIImpl.java b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/RocketMQConnectTargetRunnerAPIImpl.java
index fda982c..8971028 100644
--- a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/RocketMQConnectTargetRunnerAPIImpl.java
+++ b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/RocketMQConnectTargetRunnerAPIImpl.java
@@ -20,11 +20,8 @@ package org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
-
import java.util.ArrayList;
import java.util.Map;
-import java.util.Objects;
-
import lombok.SneakyThrows;
import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.context.RocketMQConnectSourceRunnerContext;
import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.context.RocketMQConnectTargetRunnerContext;
@@ -32,7 +29,6 @@ import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.dto.ActionStatus
import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.dto.CreateSinkConnectorRequest;
import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.dto.TransformRequest;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService;
import org.apache.rocketmq.eventbridge.domain.common.enums.EventTargetStatusEnum;
import org.apache.rocketmq.eventbridge.domain.model.Component;
import org.apache.rocketmq.eventbridge.domain.model.run.RunOptions;
@@ -45,13 +41,10 @@ public class RocketMQConnectTargetRunnerAPIImpl extends RocketMQConverter implem
private final RocketMQConnectClient rocketMQConnectClient;
- private PusherConfigManageService pusherConfigManageService;
-
public RocketMQConnectTargetRunnerAPIImpl(EventDataRepository eventDataRepository,
- RocketMQConnectClient rocketMQConnectClient, PusherConfigManageService pusherConfigManageService) {
+ RocketMQConnectClient rocketMQConnectClient) {
super(eventDataRepository);
this.rocketMQConnectClient = rocketMQConnectClient;
- this.pusherConfigManageService = pusherConfigManageService;
}
@SneakyThrows
@@ -64,14 +57,9 @@ public class RocketMQConnectTargetRunnerAPIImpl extends RocketMQConverter implem
TransformRequest filterTransform = this.buildEventBridgeFilterTransform(filterPattern);
TransformRequest eventBridgeTransform = this.buildEventBridgeTransform(targetTransform);
TargetKeyValue targetKeyValue = initSinkTaskConfig(name, topicName, sinkConnectorClass,
- sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
- if(Objects.nonNull(pusherConfigManageService)){
- pusherConfigManageService.putConnectTargetConfig(name, targetKeyValue);
- }else {
- // todo delete
- rocketMQConnectClient.createSinkConnector(name, topicName, sinkConnectorClass,
- sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
- }
+ sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
+ rocketMQConnectClient.createSinkConnector(name, topicName, sinkConnectorClass,
+ sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
RocketMQConnectTargetRunnerContext context = new RocketMQConnectTargetRunnerContext(name, JSON.toJSONString(targetKeyValue));
return new Gson().toJson(context);
}
@@ -87,17 +75,11 @@ public class RocketMQConnectTargetRunnerAPIImpl extends RocketMQConverter implem
TransformRequest eventBridgeTransform = this.buildEventBridgeTransform(targetTransform);
//create
TargetKeyValue targetKeyValue = initSinkTaskConfig(name, topicName, sinkConnectorClass,
- sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
- if(Objects.nonNull(pusherConfigManageService)){
- pusherConfigManageService.putConnectTargetConfig(name, targetKeyValue);
- }else {
- // todo delete
- //stop
- this.delete(runContext);
-
- rocketMQConnectClient.createSinkConnector(name, topicName, sinkConnectorClass,
- sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
- }
+ sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
+ //stop
+ this.delete(runContext);
+ rocketMQConnectClient.createSinkConnector(name, topicName, sinkConnectorClass,
+ sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
RocketMQConnectTargetRunnerContext context = new RocketMQConnectTargetRunnerContext(name, JSON.toJSONString(targetKeyValue));
return new Gson().toJson(context);
}
@@ -133,6 +115,7 @@ public class RocketMQConnectTargetRunnerAPIImpl extends RocketMQConverter implem
/**
* init sink task config
+ *
* @param name
* @param topicName
* @param sinkClass
@@ -140,7 +123,8 @@ public class RocketMQConnectTargetRunnerAPIImpl extends RocketMQConverter implem
* @param transforms
* @return
*/
- private TargetKeyValue initSinkTaskConfig(String name, String topicName, String sinkClass, Map<String, Object> sinkConfig, ArrayList<TransformRequest> transforms) {
+ private TargetKeyValue initSinkTaskConfig(String name, String topicName, String sinkClass,
+ Map<String, Object> sinkConfig, ArrayList<TransformRequest> transforms) {
CreateSinkConnectorRequest request = new CreateSinkConnectorRequest();
request.setName(name);
request.setTopicName(topicName);
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
index 896885d..682b830 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
@@ -17,35 +17,33 @@
package org.apache.rocketmq.eventbridge.adapter.runtimer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.PostConstruct;
import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.EventBusListener;
import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.EventRuleTransfer;
import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.EventTargetPusher;
import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.RocketMQEventSubscriber;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.RuntimerState;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
-import javax.annotation.PostConstruct;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
/**
* event bridge runtimer
*
* @author artisan
*/
@Component
-public class Runtimer extends ServiceThread{
+public class Runtimer {
private static final Logger logger = LoggerFactory.getLogger(Runtimer.class);
@@ -55,7 +53,7 @@ public class Runtimer extends ServiceThread{
private ListenerFactory listenerFactory;
- private PusherConfigManageService pusherConfigManageService;
+ private TargetRunnerConfigObserver targetRunnerConfigObserver;
private Map<String, List<TargetKeyValue>> taskConfigs = new HashMap<>();
@@ -67,51 +65,23 @@ public class Runtimer extends ServiceThread{
private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor((Runnable r) -> new Thread(r, "RuntimerScheduledThread"));
- public Runtimer(Plugin plugin, ListenerFactory listenerFactory, PusherConfigManageService configManageService) {
+ public Runtimer(Plugin plugin, ListenerFactory listenerFactory, TargetRunnerConfigObserver configManageService) {
this.plugin = plugin;
this.listenerFactory = listenerFactory;
- this.pusherConfigManageService = configManageService;
+ this.targetRunnerConfigObserver = configManageService;
}
@PostConstruct
public void initAndStart() {
logger.info("init runtimer task config");
- this.taskConfigs = pusherConfigManageService.getTaskConfigs();
- listener = new EventBusListener(listenerFactory, pusherConfigManageService);
- listener.initOrUpdateListenConsumer(taskConfigs);
- transfer = new EventRuleTransfer(plugin, listenerFactory, pusherConfigManageService);
- transfer.initOrUpdateTaskTransform(taskConfigs);
- pusher = new EventTargetPusher(plugin, listenerFactory, pusherConfigManageService);
- pusher.initOrUpdatePusherTask(taskConfigs);
+ new EventBusListener(listenerFactory, new RocketMQEventSubscriber(listenerFactory)).start();
+ new EventRuleTransfer(listenerFactory).start();
+ new EventTargetPusher(listenerFactory).start();
startRuntimer();
}
public void startRuntimer() {
runtimerState = new AtomicReference<>(RuntimerState.START);
- this.start();
}
- @Override
- public String getServiceName() {
- return Runtimer.class.getSimpleName();
- }
-
- @Override
- public void run() {
-
- listener.start();
-
- transfer.start();
-
- pusher.start();
-
- scheduledExecutorService.scheduleAtFixedRate(() -> {
- try {
- this.pusherConfigManageService.persist();
- } catch (Exception e) {
- logger.error("schedule persist config error.", e);
- }
- }, 500, 500, TimeUnit.MILLISECONDS);
-
- }
}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
index 0c40235..bc44668 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
@@ -17,172 +17,38 @@
package org.apache.rocketmq.eventbridge.adapter.runtimer.boot;
-import com.alibaba.fastjson.JSON;
-import com.google.common.collect.Lists;
-import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.data.ConnectRecord;
-import io.openmessaging.connector.api.data.RecordOffset;
-import io.openmessaging.connector.api.data.RecordPartition;
-import io.openmessaging.connector.api.data.Schema;
-import io.openmessaging.internal.DefaultKeyValue;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
+import java.util.List;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.EventSubscriber;
import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.QueueState;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.util.CollectionUtils;
-
-import java.nio.charset.StandardCharsets;
-import java.util.*;
-import java.util.concurrent.*;
/**
* listen the event and offer to queue
+ *
* @author artisan
*/
public class EventBusListener extends ServiceThread {
- private Logger logger = LoggerFactory.getLogger(EventBusListener.class);
-
- private final ConcurrentHashMap<MessageQueue, Long> messageQueuesOffsetMap;
-
- private final ConcurrentHashMap<MessageQueue, QueueState> messageQueuesStateMap;
-
- private List<String> topics = new CopyOnWriteArrayList<>();
-
- private List<DefaultLitePullConsumer> listenConsumer = new CopyOnWriteArrayList<>();
-
private ListenerFactory listenerFactory;
- private PusherConfigManageService pusherConfigManageService;
-
- private ExecutorService executorService = new ThreadPoolExecutor(20,60, 1000,TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(100));
-
- private BlockingQueue<MessageExt> eventMessage = new LinkedBlockingQueue(50000);
+ private EventSubscriber eventSubscriber;
- public EventBusListener(ListenerFactory listenerFactory, PusherConfigManageService pusherConfigManageService){
- this.messageQueuesOffsetMap = new ConcurrentHashMap<>(256);
- this.messageQueuesStateMap = new ConcurrentHashMap<>(256);
+ public EventBusListener(ListenerFactory listenerFactory,
+ EventSubscriber eventSubscriber) {
this.listenerFactory = listenerFactory;
- this.pusherConfigManageService = pusherConfigManageService;
- this.pusherConfigManageService.registerListener(new ConsumerUpdateListenerImpl());
- }
-
- /**
- * init listen consumer
- * @param taskConfig
- */
- public void initOrUpdateListenConsumer(Map<String, List<TargetKeyValue>> taskConfig){
- if(MapUtils.isEmpty(taskConfig)){
- logger.warn("initListenConsumer by taskConfig param is empty");
- return;
- }
- List<TargetKeyValue> targetKeyValues = initTaskKeyInfo(taskConfig);
- this.topics.addAll(listenerFactory.parseTopicListByList(targetKeyValues));
- for (String topic : topics){
- DefaultLitePullConsumer pullConsumer = listenerFactory.initDefaultMQPullConsumer(topic);
- listenConsumer.add(pullConsumer);
- }
- logger.info("init or update consumer succeed , consumer is - {}", JSON.toJSONString(listenConsumer));
- }
-
- /**
- * init all task config info
- * @param taskConfig
- * @return
- */
- private List<TargetKeyValue> initTaskKeyInfo(Map<String, List<TargetKeyValue>> taskConfig) {
- Set<TargetKeyValue> targetKeyValues = new HashSet<>();
- for(String connectName : taskConfig.keySet()){
- targetKeyValues.addAll(taskConfig.get(connectName));
- }
- return Lists.newArrayList(targetKeyValues);
+ this.eventSubscriber = eventSubscriber;
}
@Override
public void run() {
- while (!stopped){
- if(CollectionUtils.isEmpty(listenConsumer)){
- logger.info("current listen consumer is empty");
- this.waitForRunning(1000);
- continue;
- }
- for(DefaultLitePullConsumer pullConsumer : listenConsumer) {
- executorService.submit(() -> {
- try {
- List<MessageExt> messageExts = pullConsumer.poll(3000);
- if (CollectionUtils.isEmpty(messageExts)) {
- logger.info("consumer poll message empty , consumer - {}", JSON.toJSONString(pullConsumer));
- return;
- }
- for (MessageExt messageExt : messageExts) {
- ConnectRecord eventRecord = convertToSinkRecord(messageExt);
- listenerFactory.offerEventRecord(eventRecord);
- logger.debug("consumer - {} - offer listen event record - {} - by - message event- {}", JSON.toJSONString(pullConsumer), eventRecord, messageExt);
- }
- } finally {
- pullConsumer.commitSync();
- }
- });
- }
- }
- }
-
- @Override
- public String getServiceName() {
- return this.getClass().getSimpleName();
- }
-
- /**
- * MessageExt convert to connect record
- * @param messageExt
- * @return
- */
- private ConnectRecord convertToSinkRecord(MessageExt messageExt) {
- Map<String, String> properties = messageExt.getProperties();
- Schema schema;
- Long timestamp;
- ConnectRecord sinkRecord;
- String connectTimestamp = properties.get(RuntimerConfigDefine.CONNECT_TIMESTAMP);
- timestamp = StringUtils.isNotEmpty(connectTimestamp) ? Long.valueOf(connectTimestamp) : null;
- String connectSchema = properties.get(RuntimerConfigDefine.CONNECT_SCHEMA);
- schema = StringUtils.isNotEmpty(connectSchema) ? JSON.parseObject(connectSchema, Schema.class) : null;
- byte[] body = messageExt.getBody();
- RecordPartition recordPartition = listenerFactory.convertToRecordPartition(messageExt.getTopic(), messageExt.getBrokerName(), messageExt.getQueueId());
- RecordOffset recordOffset = listenerFactory.convertToRecordOffset(messageExt.getQueueOffset());
- String bodyStr = new String(body, StandardCharsets.UTF_8);
- sinkRecord = new ConnectRecord(recordPartition, recordOffset, timestamp, schema, bodyStr);
- KeyValue keyValue = new DefaultKeyValue();
- keyValue.put(RuntimerConfigDefine.CONNECT_TOPICNAME, messageExt.getTopic());
- if (MapUtils.isNotEmpty(properties)) {
- for (Map.Entry<String, String> entry : properties.entrySet()) {
- keyValue.put(entry.getKey(), entry.getValue());
- }
+ while (!stopped) {
+ List<ConnectRecord> recordList = eventSubscriber.pull();
+ listenerFactory.offerEventRecords(recordList);
}
- sinkRecord.addExtension(keyValue);
- return sinkRecord;
}
- /**
- * consumer update listener
- */
- class ConsumerUpdateListenerImpl implements PusherConfigManageService.TargetConfigUpdateListener {
-
- @Override
- public void onConfigUpdate(PusherTargetEntity targetEntity) {
- logger.info("consumer update by new target config changed, target info -{}", JSON.toJSONString(targetEntity));
- Map<String, List<TargetKeyValue>> lastTargetMap = new HashMap<>();
- lastTargetMap.put(targetEntity.getConnectName(), targetEntity.getTargetKeyValues());
- initOrUpdateListenConsumer(lastTargetMap);
- }
+ @Override public String getServiceName() {
+ return EventBusListener.class.getSimpleName();
}
}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
index f14fdec..eaee64c 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
@@ -19,19 +19,23 @@ package org.apache.rocketmq.eventbridge.adapter.runtimer.boot;
import com.alibaba.fastjson.JSON;
import io.openmessaging.connector.api.data.ConnectRecord;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory;
import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.transfer.TransformEngine;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.*;
-import java.util.concurrent.*;
-
/**
* receive event and transfer the rule to pusher
*/
@@ -41,23 +45,10 @@ public class EventRuleTransfer extends ServiceThread {
private ListenerFactory listenerFactory;
- private PusherConfigManageService pusherConfigManageService;
-
- private Plugin plugin;
+ private ExecutorService executorService = new ThreadPoolExecutor(20, 60, 1000, TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(100));
- Map<TargetKeyValue/*taskConfig*/, TransformEngine<ConnectRecord>/*taskTransform*/> taskTransformMap = new ConcurrentHashMap<>(20);
-
- private ExecutorService executorService = new ThreadPoolExecutor(20,60, 1000,TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(100));
-
- public EventRuleTransfer(Plugin plugin, ListenerFactory listenerFactory, PusherConfigManageService pusherConfigManageService){
- this.plugin = plugin;
+ public EventRuleTransfer(ListenerFactory listenerFactory) {
this.listenerFactory = listenerFactory;
- this.pusherConfigManageService = pusherConfigManageService;
- this.pusherConfigManageService.registerListener(new TransformUpdateListenerImpl());
- }
-
- public void initOrUpdateTaskTransform(Map<String, List<TargetKeyValue>> taskConfig){
- this.taskTransformMap.putAll(initSinkTaskTransformInfo(taskConfig));
}
@Override
@@ -67,9 +58,10 @@ public class EventRuleTransfer extends ServiceThread {
@Override
public void run() {
- while (!stopped){
+ while (!stopped) {
+ //TODO JAVA8 并发流处理
ConnectRecord eventRecord = listenerFactory.takeEventRecord();
- if(Objects.isNull(eventRecord)){
+ if (Objects.isNull(eventRecord)) {
logger.info("listen eventRecord is empty, continue by curTime - {}", System.currentTimeMillis());
this.waitForRunning(1000);
continue;
@@ -77,62 +69,22 @@ public class EventRuleTransfer extends ServiceThread {
executorService.submit(() -> {
// extension add sub
// rule - target
- for (TargetKeyValue targetKeyValue : taskTransformMap.keySet()){
- // add threadPool for cup task
- // attention coreSize
- TransformEngine<ConnectRecord> transformEngine = taskTransformMap.get(targetKeyValue);
+ listenerFactory.getTaskTransformMap().entrySet().forEach(entry -> {
+ TransformEngine<ConnectRecord> transformEngine = entry.getValue();
ConnectRecord transformRecord = transformEngine.doTransforms(eventRecord);
- if(Objects.isNull(transformRecord)){
- continue;
+ if (Objects.isNull(transformRecord)) {
+ return;
}
// a bean for maintain
- Map<TargetKeyValue,ConnectRecord> targetMap = new HashMap<>();
- targetMap.put(targetKeyValue, transformRecord);
- listenerFactory.offerTargetTaskQueue(targetMap);
+ listenerFactory.offerTargetTaskQueue(transformRecord);
+ logger.debug("offer target task queue succeed, targetMap - {}", JSON.toJSONString(transformRecord));
+ });
+
- logger.debug("offer target task queue succeed, targetMap - {}", JSON.toJSONString(targetMap));
- // metrics
- // logger
- // key->connectKeyValue to simple name
- // connectRecord add system properties for taskClass info
- }
});
}
}
- /**
- * Init sink task transform map
- * key: task config
- * value: transformEngine
- * @param taskConfig
- * @return
- */
- private Map<TargetKeyValue, TransformEngine<ConnectRecord>> initSinkTaskTransformInfo(Map<String, List<TargetKeyValue>> taskConfig) {
- Map<TargetKeyValue, TransformEngine<ConnectRecord>> curTaskTransformMap = new HashMap<>();
- Set<TargetKeyValue> allTaskKeySet = new HashSet<>();
- for(String connectName : taskConfig.keySet()){
- List<TargetKeyValue> taskKeyList = taskConfig.get(connectName);
- allTaskKeySet.addAll(new HashSet<>(taskKeyList));
- }
- for(TargetKeyValue keyValue : allTaskKeySet){
- TransformEngine<ConnectRecord> transformChain = new TransformEngine<>(keyValue, plugin);
- curTaskTransformMap.put(keyValue, transformChain);
- }
- logger.info("init sink task transform info succeed, transform map - {}", JSON.toJSONString(curTaskTransformMap));
- return curTaskTransformMap;
- }
- /**
- * transform update listener
- */
- class TransformUpdateListenerImpl implements PusherConfigManageService.TargetConfigUpdateListener {
- @Override
- public void onConfigUpdate(PusherTargetEntity targetEntity) {
- logger.info("transform update by new target config changed, target info -{}", JSON.toJSONString(targetEntity));
- Map<String, List<TargetKeyValue>> lastTargetMap = new HashMap<>();
- lastTargetMap.put(targetEntity.getConnectName(), targetEntity.getTargetKeyValues());
- initOrUpdateTaskTransform(lastTargetMap);
- }
- }
}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
index d324fe4..35c6bb2 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
@@ -22,27 +22,32 @@ import com.google.common.collect.Lists;
import io.netty.util.internal.ConcurrentSet;
import io.openmessaging.connector.api.component.task.sink.SinkTask;
import io.openmessaging.connector.api.data.ConnectRecord;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.collections.MapUtils;
import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerListener;
import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.pusher.PusherTaskContext;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.PluginClassLoader;
import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.*;
-import java.util.concurrent.CopyOnWriteArrayList;
-
/**
* event target push to sink task
+ *
* @author artisan
*/
-public class EventTargetPusher extends ServiceThread {
+public class EventTargetPusher extends ServiceThread{
private static final Logger logger = LoggerFactory.getLogger(EventTargetPusher.class);
@@ -56,29 +61,27 @@ public class EventTargetPusher extends ServiceThread {
private ListenerFactory listenerFactory;
- private PusherConfigManageService pusherConfigManageService;
+ private TargetRunnerConfigObserver targetRunnerConfigObserver;
private List<SinkTask> pusherTasks = new CopyOnWriteArrayList<>();
- public EventTargetPusher(Plugin plugin, ListenerFactory listenerFactory, PusherConfigManageService pusherConfigManageService){
- this.plugin = plugin;
+ public EventTargetPusher(ListenerFactory listenerFactory) {
this.listenerFactory = listenerFactory;
- this.pusherConfigManageService = pusherConfigManageService;
- this.pusherConfigManageService.registerListener(new TargetUpdateListenerImpl());
}
/**
* init running tasks
+ *
* @param taskConfig
*/
- public void initOrUpdatePusherTask(Map<String, List<TargetKeyValue>> taskConfig){
+ public void initOrUpdatePusherTask(Map<String, List<TargetKeyValue>> taskConfig) {
Set<TargetKeyValue> taskProperty = new HashSet<>();
- for(String connectName : taskConfig.keySet()){
+ for (String connectName : taskConfig.keySet()) {
List<TargetKeyValue> targetKeyValues = taskConfig.get(connectName);
taskProperty.addAll(new HashSet<>(targetKeyValues));
}
- for(TargetKeyValue targetKeyValue : taskProperty){
- try{
+ for (TargetKeyValue targetKeyValue : taskProperty) {
+ try {
String taskClass = targetKeyValue.getString(RuntimerConfigDefine.TASK_CLASS);
ClassLoader loader = plugin.getPluginClassLoader(taskClass);
Class taskClazz;
@@ -98,7 +101,7 @@ public class EventTargetPusher extends ServiceThread {
Plugin.compareAndSwapLoaders(loader);
}
logger.info("init target task succeed, target key - {}", JSON.toJSONString(targetKeyValue));
- }catch (Exception exception){
+ } catch (Exception exception) {
exception.printStackTrace();
}
}
@@ -106,26 +109,26 @@ public class EventTargetPusher extends ServiceThread {
@Override
public void run() {
- while (!stopped){
- Map<TargetKeyValue, ConnectRecord> taskPusher = listenerFactory.takeTargetMap();
- if(MapUtils.isEmpty(taskPusher)){
- logger.info("current target pusher is empty");
- this.waitForRunning(1000);
- continue;
- }
- logger.info("start push content by pusher - {}", JSON.toJSONString(taskPusher));
-
- TargetKeyValue targetKeyValue = taskPusher.keySet().iterator().next();
- // task-id for unique-key at ConnectKeyValue
- // ConnectKeyValue -> new class for name
- // also add in ConnectRecord class system property
- String taskPushName = targetKeyValue.getString(RuntimerConfigDefine.TASK_CLASS);
- // add thread pool
- for(SinkTask sinkTask : pusherTasks){
- if(sinkTask.getClass().getName().equals(taskPushName)){
- sinkTask.put(Lists.newArrayList(taskPusher.get(targetKeyValue)));
- }
- }
+ while (!stopped) {
+// Map<TargetKeyValue, ConnectRecord> taskPusher = listenerFactory.takeTargetMap();
+// if (MapUtils.isEmpty(taskPusher)) {
+// logger.info("current target pusher is empty");
+// this.waitForRunning(1000);
+// continue;
+// }
+// logger.info("start push content by pusher - {}", JSON.toJSONString(taskPusher));
+//
+// TargetKeyValue targetKeyValue = taskPusher.keySet().iterator().next();
+// // task-id for unique-key at ConnectKeyValue
+// // ConnectKeyValue -> new class for name
+// // also add in ConnectRecord class system property
+// String taskPushName = targetKeyValue.getString(RuntimerConfigDefine.TASK_CLASS);
+// // add thread pool
+// for (SinkTask sinkTask : pusherTasks) {
+// if (sinkTask.getClass().getName().equals(taskPushName)) {
+// sinkTask.put(Lists.newArrayList(taskPusher.get(targetKeyValue)));
+// }
+// }
}
}
@@ -134,17 +137,5 @@ public class EventTargetPusher extends ServiceThread {
return EventTargetPusher.class.getSimpleName();
}
- /**
- * target update listener
- */
- class TargetUpdateListenerImpl implements PusherConfigManageService.TargetConfigUpdateListener {
-
- @Override
- public void onConfigUpdate(PusherTargetEntity targetEntity) {
- logger.info("transform update by new target config changed, target info -{}", JSON.toJSONString(targetEntity));
- Map<String, List<TargetKeyValue>> lastTargetMap = new HashMap<>();
- lastTargetMap.put(targetEntity.getConnectName(), targetEntity.getTargetKeyValues());
- initOrUpdatePusherTask(lastTargetMap);
- }
- }
+
}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/EventSubscriber.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/EventSubscriber.java
new file mode 100644
index 0000000..23c499b
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/EventSubscriber.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener;
+
+import io.openmessaging.connector.api.data.ConnectRecord;
+import java.util.List;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+
+public abstract class EventSubscriber implements TargetRunnerListener {
+
+ abstract void refresh();
+
+ /**
+ * Pull connect records from store, Blocking method when is empty.
+ *
+ * @return
+ */
+ public abstract List<ConnectRecord> pull();
+
+ /**
+ * Commit the connect records.
+ *
+ * @param connectRecordList
+ */
+ public abstract void commit(List<ConnectRecord> connectRecordList);
+
+ /**
+ * Call when add new target runner to runtimer.
+ *
+ * @param targetRunnerConfig
+ */
+ @Override
+ public void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+ this.refresh();
+ }
+
+ /**
+ * Call when the old target runner updated.
+ *
+ * @param targetRunnerConfig
+ */
+ @Override
+ public void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+
+ this.refresh();
+ }
+
+ /**
+ * Call when the old target runner deleted from runtimer.
+ *
+ * @param targetRunnerConfig
+ */
+ @Override
+ public void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+ this.refresh();
+ }
+
+}
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java
index 07f010e..2d22094 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java
@@ -17,32 +17,42 @@
package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener;
+import com.alibaba.fastjson.JSON;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.data.RecordOffset;
import io.openmessaging.connector.api.data.RecordPartition;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.transfer.TransformEngine;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.LoggerName;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
-import java.util.*;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
@Component
-public class ListenerFactory {
+public class ListenerFactory implements TargetRunnerListener {
private final static Logger logger = LoggerFactory.getLogger(LoggerName.EventBus_Listener);
@@ -52,13 +62,19 @@ public class ListenerFactory {
public static final String QUEUE_OFFSET = "queueOffset";
- private BlockingQueue<PusherTargetEntity> pusherTargetQueue = new LinkedBlockingQueue<>(1000);
+ private Plugin plugin;
+
+ private BlockingQueue<TargetRunnerConfig> pusherTargetQueue = new LinkedBlockingQueue<>(1000);
private BlockingQueue<MessageExt> eventMessage = new LinkedBlockingQueue(50000);
private BlockingQueue<ConnectRecord> eventRecord = new LinkedBlockingQueue<>(50000);
- private BlockingQueue<Map<TargetKeyValue, ConnectRecord>> targetQueue = new LinkedBlockingQueue<>(50000);
+ private BlockingQueue<ConnectRecord> targetQueue = new LinkedBlockingQueue<>(50000);
+
+ private Map<String/*TargetRunnerName*/, TransformEngine<ConnectRecord>> taskTransformMap = new ConcurrentHashMap<>(20);
+ private Map<String/*TargetRunnerName*/, SinkTask> pusherTaskMap = new ConcurrentHashMap<>(20);
+ private Map<String/*TargetRunnerName*/, TargetRunnerConfig> targetRunnerConfigMap = new ConcurrentHashMap<>(20);
@Value("${rocketmq.namesrvAddr:}")
private String namesrvAddr;
@@ -70,14 +86,14 @@ public class ListenerFactory {
consumer.setNamesrvAddr(namesrvAddr);
consumer.subscribe(topic, "*");
consumer.start();
- }catch (Exception exception){
+ } catch (Exception exception) {
logger.error("init default pull consumer exception, topic -" + topic + "-stackTrace-", exception);
}
return consumer;
}
- public PusherTargetEntity takeTaskConfig(){
- if(pusherTargetQueue.isEmpty()){
+ public TargetRunnerConfig takeTaskConfig() {
+ if (pusherTargetQueue.isEmpty()) {
return null;
}
try {
@@ -90,45 +106,47 @@ public class ListenerFactory {
/**
* Offer listener event
+ *
* @param messageExt
* @return
*/
- public boolean offerListenEvent(MessageExt messageExt){
+ public boolean offerListenEvent(MessageExt messageExt) {
return eventMessage.offer(messageExt);
}
public MessageExt takeListenerEvent() {
- if(eventMessage.isEmpty()){
+ if (eventMessage.isEmpty()) {
return null;
}
try {
return eventMessage.take();
- }catch (Exception exception){
+ } catch (Exception exception) {
exception.printStackTrace();
}
return null;
}
/**
- * offer event record
- * @param connectRecord
- * @return
+ * offer event records
+ *
+ * @param connectRecords
*/
- public boolean offerEventRecord(ConnectRecord connectRecord){
- return eventRecord.offer(connectRecord);
+ public boolean offerEventRecords(List<ConnectRecord> connectRecords) {
+ return eventRecord.addAll(connectRecords);
}
/**
* take event record
+ *
* @return
*/
public ConnectRecord takeEventRecord() {
- if(eventRecord.isEmpty()){
+ if (eventRecord.isEmpty()) {
return null;
}
try {
return eventRecord.take();
- }catch (Exception exception){
+ } catch (Exception exception) {
logger.error("take event record exception - stack-> ", exception);
}
return null;
@@ -148,6 +166,7 @@ public class ListenerFactory {
/**
* parse topic list by task config
+ *
* @param taskConfig
* @return
*/
@@ -162,12 +181,13 @@ public class ListenerFactory {
/**
* parse topic list by task config list
+ *
* @param taskConfigs
* @return
*/
public List<String> parseTopicListByList(List<TargetKeyValue> taskConfigs) {
Set<String> allTopicList = Sets.newHashSet();
- for(TargetKeyValue taskConfig : taskConfigs){
+ for (TargetKeyValue taskConfig : taskConfigs) {
String messageQueueStr = taskConfig.getString(RuntimerConfigDefine.CONNECT_TOPICNAME);
if (StringUtils.isBlank(messageQueueStr)) {
continue;
@@ -180,6 +200,7 @@ public class ListenerFactory {
/**
* parse msg queue by queue json
+ *
* @param messageQueueStr
* @return
*/
@@ -207,19 +228,67 @@ public class ListenerFactory {
return recordOffset;
}
- public boolean offerTargetTaskQueue(Map<TargetKeyValue, ConnectRecord> targetMap){
- return targetQueue.offer(targetMap);
+ public boolean offerTargetTaskQueue(ConnectRecord connectRecord) {
+ return targetQueue.offer(connectRecord);
}
- public Map<TargetKeyValue, ConnectRecord> takeTargetMap(){
- if(targetQueue.isEmpty()){
+ public ConnectRecord takeTargetMap() {
+ if (targetQueue.isEmpty()) {
return null;
}
- try{
+ try {
return targetQueue.take();
- }catch (Exception exception){
+ } catch (Exception exception) {
exception.printStackTrace();
}
return null;
}
+
+ /**
+ * Init sink task transform map key: task config value: transformEngine
+ *
+ * @param taskConfig
+ * @return
+ */
+ private Map<TargetKeyValue, TransformEngine<ConnectRecord>> initSinkTaskTransformInfo(
+ Map<String, List<TargetKeyValue>> taskConfig) {
+ Map<TargetKeyValue, TransformEngine<ConnectRecord>> curTaskTransformMap = new HashMap<>();
+ Set<TargetKeyValue> allTaskKeySet = new HashSet<>();
+ for (String connectName : taskConfig.keySet()) {
+ List<TargetKeyValue> taskKeyList = taskConfig.get(connectName);
+ allTaskKeySet.addAll(new HashSet<>(taskKeyList));
+ }
+ for (TargetKeyValue keyValue : allTaskKeySet) {
+ TransformEngine<ConnectRecord> transformChain = new TransformEngine<>(keyValue, plugin);
+ curTaskTransformMap.put(keyValue, transformChain);
+ }
+ logger.info("init sink task transform info succeed, transform map - {}", JSON.toJSONString(curTaskTransformMap));
+ return curTaskTransformMap;
+ }
+
+
+ @Override public void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+ // refresh taskTransformMap & pusherTaskMap & targetRunnerConfigMap
+ }
+
+ @Override public void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+
+ }
+
+ @Override public void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+
+ }
+
+ public Map<String, TransformEngine<ConnectRecord>> getTaskTransformMap() {
+ return taskTransformMap;
+ }
+
+ public Map<String, SinkTask> getPusherTaskMap() {
+ return pusherTaskMap;
+ }
+
+ public Map<String, TargetRunnerConfig> getTargetRunnerConfigMap() {
+ return targetRunnerConfigMap;
+ }
+
}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
new file mode 100644
index 0000000..7c8e5a3
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener;
+
+import io.openmessaging.connector.api.data.ConnectRecord;
+import java.util.List;
+
+public class RocketMQEventSubscriber extends EventSubscriber {
+
+ private ListenerFactory listenerFactory;
+
+ public RocketMQEventSubscriber(
+ ListenerFactory listenerFactory) {
+ this.listenerFactory = listenerFactory;
+ }
+
+ @Override void refresh() {
+
+ }
+
+ @Override public List<ConnectRecord> pull() {
+ return null;
+ }
+
+ @Override
+ public void commit(List<ConnectRecord> connectRecordList) {
+
+ }
+}
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/TargetRunnerListener.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/TargetRunnerListener.java
new file mode 100644
index 0000000..f528b7b
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/TargetRunnerListener.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener;
+
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+
+public interface TargetRunnerListener {
+
+ /**
+ * Call when add new target runner to runtimer.
+ *
+ * @param targetRunnerConfig
+ */
+ void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig);
+
+ /**
+ * Call when the old target runner updated.
+ *
+ * @param targetRunnerConfig
+ */
+ void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig);
+
+ /**
+ * Call when the old target runner deleted from runtimer.
+ *
+ * @param targetRunnerConfig
+ */
+ void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig);
+
+}
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/PusherTargetEntity.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
similarity index 54%
rename from adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/PusherTargetEntity.java
rename to adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
index dc7ced4..45bc5f1 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/PusherTargetEntity.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
@@ -17,34 +17,45 @@
package org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity;
-import lombok.Data;
-
import java.io.Serializable;
-import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import lombok.Data;
/**
* pusher target key config
*/
@Data
-public class PusherTargetEntity implements Serializable {
+public class TargetRunnerConfig implements Serializable {
- private String connectName;
+ private String name;
- private List<TargetKeyValue> targetKeyValues;
+ /**
+ * All data are reserved in this map.
+ */
+ private Map<String, String> properties;
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ TargetRunnerConfig config = (TargetRunnerConfig) o;
+ return Objects.equals(name, config.name) && Objects.equals(properties, config.properties);
+ }
@Override
- public boolean equals(Object object){
- if (object != null && object.getClass() == this.getClass()) {
- PusherTargetEntity targetEntity = (PusherTargetEntity) object;
- return this.connectName.equals(targetEntity.getConnectName())
- && this.targetKeyValues.size() == targetEntity.getTargetKeyValues().size()
- && this.targetKeyValues.containsAll(targetEntity.getTargetKeyValues());
- }
- return false;
+ public int hashCode() {
+ return Objects.hash(name, properties);
}
@Override
- public int hashCode(){
- return this.connectName.hashCode() + this.targetKeyValues.hashCode();
+ public String toString() {
+ //TODO
+ return "TargetRunnerConfig{" +
+ "connectName='" + name + '\'' +
+ ", properties=" + properties +
+ '}';
}
}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java
index ebf1c88..500f300 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java
@@ -80,6 +80,8 @@ public class RuntimerConfigDefine {
public static final String CONNECT_TOPICNAME = "connect-topicname";
+ public static final String CONNECT_RULE_NAME = "connect-rule-name";
+
public static final String CONNECT_TOPICNAMES = "connect-topicnames";
public static final String CONNECT_SOURCE_PARTITION = "connect-source-partition";
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/AbstractTargetRunnerConfigObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/AbstractTargetRunnerConfigObserver.java
new file mode 100644
index 0000000..619bc63
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/AbstractTargetRunnerConfigObserver.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
+
+import com.google.common.collect.Sets;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerListener;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
+
+@Service
+public abstract class AbstractTargetRunnerConfigObserver implements TargetRunnerConfigObserver {
+
+ private Set<TargetRunnerConfig> targetRunnerConfigs = Sets.newHashSet();
+
+ /**
+ * All listeners to trigger while config change.
+ */
+ private Set<TargetRunnerListener> targetRunnerConfigListeners = new HashSet<>();
+
+ public Set<TargetRunnerConfig> getTargetRunnerConfig() {
+ return targetRunnerConfigs;
+ }
+
+ public abstract Set<TargetRunnerConfig> getLatestTargetRunnerConfig();
+
+ @Override
+ public void registerListener(TargetRunnerListener listener) {
+ this.targetRunnerConfigListeners.add(listener);
+ }
+
+ void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+ this.targetRunnerConfigs.add(targetRunnerConfig);
+ if (CollectionUtils.isEmpty(this.targetRunnerConfigListeners)) {
+ return;
+ }
+ for (TargetRunnerListener listener : this.targetRunnerConfigListeners) {
+ listener.onAddTargetRunner(targetRunnerConfig);
+ }
+ }
+
+ /**
+ * Call when the old target runner updated.
+ *
+ * @param targetRunnerConfig
+ */
+ void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+ this.targetRunnerConfigs.add(targetRunnerConfig);
+ if (CollectionUtils.isEmpty(this.targetRunnerConfigListeners)) {
+ return;
+ }
+ for (TargetRunnerListener listener : this.targetRunnerConfigListeners) {
+ listener.onUpdateTargetRunner(targetRunnerConfig);
+ }
+ }
+
+ /**
+ * Call when the old target runner deleted from runtimer.
+ *
+ * @param targetRunnerConfig
+ */
+ void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+ this.targetRunnerConfigs.remove(targetRunnerConfig);
+ if (CollectionUtils.isEmpty(this.targetRunnerConfigListeners)) {
+ return;
+ }
+ for (TargetRunnerListener listener : this.targetRunnerConfigListeners) {
+ listener.onDeleteTargetRunner(targetRunnerConfig);
+ }
+ }
+
+ public Map<String, List<TargetKeyValue>> getTaskConfigs() {
+ return null;
+ }
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageServiceImpl.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageServiceImpl.java
deleted file mode 100644
index 58a66fd..0000000
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageServiceImpl.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
-
-import com.google.common.collect.Lists;
-import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.component.connector.Connector;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.FilePathConfigUtil;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.store.FileBaseKeyValueStore;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.store.KeyValueStore;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.converter.JsonConverter;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.converter.ListConverter;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Service;
-import org.springframework.util.CollectionUtils;
-
-import javax.annotation.PostConstruct;
-import java.util.*;
-import java.util.concurrent.CopyOnWriteArraySet;
-
-@Service
-public class PusherConfigManageServiceImpl implements PusherConfigManageService {
-
- /**
- * plugin for recognize class loader
- */
- private Plugin plugin;
-
- /**
- * Current connector configs in the store.
- */
- private KeyValueStore<String, TargetKeyValue> connectorKeyValueStore;
-
- /**
- * Current task configs in the store.
- */
- private KeyValueStore<String, List<TargetKeyValue>> taskKeyValueStore;
-
- /**
- * All listeners to trigger while config change.
- */
- private Set<TargetConfigUpdateListener> targetConfigUpdateListeners;
-
- private Set<String> connectTopicNames;
-
- @Value("${runtimer.storePathRootDir:}")
- private String storeRootPath;
-
- public PusherConfigManageServiceImpl(Plugin plugin){
- this.plugin = plugin;
- this.connectTopicNames = new CopyOnWriteArraySet<>();
- this.targetConfigUpdateListeners = new HashSet<>();
- }
-
- @PostConstruct
- public void initStoreKeyValue(){
- this.connectorKeyValueStore = new FileBaseKeyValueStore<>(
- FilePathConfigUtil.getConnectorConfigPath(this.storeRootPath),
- new JsonConverter(),
- new JsonConverter(TargetKeyValue.class));
- this.taskKeyValueStore = new FileBaseKeyValueStore<>(
- FilePathConfigUtil.getTaskConfigPath(this.storeRootPath),
- new JsonConverter(),
- new ListConverter(TargetKeyValue.class));
- this.connectorKeyValueStore.load();
- this.taskKeyValueStore.load();
- }
-
- /**
- * get all connector configs enabled
- *
- * @return
- */
- @Override
- public Map<String, TargetKeyValue> getConnectorConfigs() {
- Map<String, TargetKeyValue> result = new HashMap<>();
- Map<String, TargetKeyValue> connectorConfigs = connectorKeyValueStore.getKVMap();
- for (String connectorName : connectorConfigs.keySet()) {
- TargetKeyValue config = connectorConfigs.get(connectorName);
- if (0 != config.getInt(RuntimerConfigDefine.CONFIG_DELETED)) {
- continue;
- }
- result.put(connectorName, config);
- }
- return result;
- }
-
- @Override
- public String putConnectTargetConfig(String connectorName, TargetKeyValue configs) throws Exception {
- TargetKeyValue exist = connectorKeyValueStore.get(connectorName);
- if (null != exist) {
- Long updateTimestamp = exist.getLong(RuntimerConfigDefine.UPDATE_TIMESTAMP);
- if (null != updateTimestamp) {
- configs.put(RuntimerConfigDefine.UPDATE_TIMESTAMP, updateTimestamp);
- }
- }
- if (configs.equals(exist)) {
- return "Connector with same config already exist.";
- }
-
- Long currentTimestamp = System.currentTimeMillis();
- configs.put(RuntimerConfigDefine.UPDATE_TIMESTAMP, currentTimestamp);
- for (String requireConfig : RuntimerConfigDefine.REQUEST_CONFIG) {
- if (!configs.containsKey(requireConfig)) {
- return "Request config key: " + requireConfig;
- }
- }
-
- String connectorClass = configs.getString(RuntimerConfigDefine.CONNECTOR_CLASS);
- ClassLoader classLoader = plugin.getPluginClassLoader(connectorClass);
- Class clazz;
- if (null != classLoader) {
- clazz = Class.forName(connectorClass, true, classLoader);
- } else {
- clazz = Class.forName(connectorClass);
- }
- final Connector connector = (Connector) clazz.getDeclaredConstructor().newInstance();
- connector.validate(configs);
- connector.init(configs);
- connectorKeyValueStore.put(connectorName, configs);
- recomputeTaskConfigs(connectorName, connector, currentTimestamp, configs);
- return "";
- }
-
- @Override
- public void recomputeTaskConfigs(String connectorName, Connector connector, Long currentTimestamp, TargetKeyValue configs) {
- int maxTask = configs.getInt(RuntimerConfigDefine.MAX_TASK, 1);
- List<KeyValue> taskConfigs = connector.taskConfigs(maxTask);
- List<TargetKeyValue> converterdConfigs = new ArrayList<>();
- for (KeyValue keyValue : taskConfigs) {
- TargetKeyValue newKeyValue = new TargetKeyValue();
- for (String key : keyValue.keySet()) {
- newKeyValue.put(key, keyValue.getString(key));
- }
- newKeyValue.put(RuntimerConfigDefine.TASK_CLASS, connector.taskClass().getName());
- newKeyValue.put(RuntimerConfigDefine.UPDATE_TIMESTAMP, currentTimestamp);
-
- newKeyValue.put(RuntimerConfigDefine.CONNECT_TOPICNAME, configs.getString(RuntimerConfigDefine.CONNECT_TOPICNAME));
- newKeyValue.put(RuntimerConfigDefine.CONNECT_TOPICNAMES, configs.getString(RuntimerConfigDefine.CONNECT_TOPICNAMES));
- Set<String> connectConfigKeySet = configs.keySet();
- for (String connectConfigKey : connectConfigKeySet) {
- if (connectConfigKey.startsWith(RuntimerConfigDefine.TRANSFORMS)) {
- newKeyValue.put(connectConfigKey, configs.getString(connectConfigKey));
- }
- }
- converterdConfigs.add(newKeyValue);
- connectTopicNames.add(configs.getString(RuntimerConfigDefine.CONNECT_TOPICNAME));
- }
- putTaskConfigs(connectorName, converterdConfigs);
- }
-
- @Override
- public void removeConnectorConfig(String connectorName) {
- TargetKeyValue config = connectorKeyValueStore.get(connectorName);
- if(Objects.isNull(config)){
- return;
- }
- config.put(RuntimerConfigDefine.UPDATE_TIMESTAMP, System.currentTimeMillis());
- config.put(RuntimerConfigDefine.CONFIG_DELETED, 1);
- List<TargetKeyValue> taskConfigList = taskKeyValueStore.get(connectorName);
- taskConfigList.add(config);
- connectorKeyValueStore.put(connectorName, config);
- putTaskConfigs(connectorName, taskConfigList);
- }
-
- @Override
- public Map<String, List<TargetKeyValue>> getTaskConfigs() {
- Map<String, List<TargetKeyValue>> result = new HashMap<>();
- Map<String, List<TargetKeyValue>> taskConfigs = taskKeyValueStore.getKVMap();
- Map<String, TargetKeyValue> filteredConnector = getConnectorConfigs();
- for (String connectorName : taskConfigs.keySet()) {
- if (!filteredConnector.containsKey(connectorName)) {
- continue;
- }
- result.put(connectorName, taskConfigs.get(connectorName));
- }
- return result;
- }
-
- @Override
- public Set<PusherTargetEntity> getTargetInfo() {
- Set<PusherTargetEntity> result = new HashSet<>();
- Map<String, List<TargetKeyValue>> taskConfigs = taskKeyValueStore.getKVMap();
- Map<String, TargetKeyValue> filteredConnector = getConnectorConfigs();
- for (String connectorName : taskConfigs.keySet()) {
- if (!filteredConnector.containsKey(connectorName)) {
- continue;
- }
- PusherTargetEntity targetEntity = new PusherTargetEntity();
- targetEntity.setConnectName(connectorName);
- targetEntity.setTargetKeyValues(taskConfigs.get(connectorName));
- result.add(targetEntity);
- }
- return result;
- }
-
- @Override
- public List<String> getConnectTopics(){
- if(CollectionUtils.isEmpty(connectTopicNames)){
- return Lists.newArrayList();
- }
- return Lists.newArrayList(connectTopicNames);
- }
-
- @Override
- public void persist() {
- this.connectorKeyValueStore.persist();
- this.taskKeyValueStore.persist();
- }
-
- @Override
- public void registerListener(TargetConfigUpdateListener listener) {
- this.targetConfigUpdateListeners.add(listener);
- }
-
- /**
- * put target task key config for update
- * @param connectorName
- * @param configs
- */
- private void putTaskConfigs(String connectorName, List<TargetKeyValue> configs) {
- List<TargetKeyValue> exist = taskKeyValueStore.get(connectorName);
- if (null != exist && exist.size() > 0) {
- taskKeyValueStore.remove(connectorName);
- }
- taskKeyValueStore.put(connectorName, configs);
- PusherTargetEntity targetEntity = new PusherTargetEntity();
- targetEntity.setConnectName(connectorName);
- targetEntity.setTargetKeyValues(configs);
- triggerListener(targetEntity);
- persistStore();
- }
-
- private void persistStore() {
-
- }
-
- /**
- * trigger new target task config for update
- * @param pusherTargetEntity
- */
- private void triggerListener(PusherTargetEntity pusherTargetEntity) {
- if (CollectionUtils.isEmpty(this.targetConfigUpdateListeners)) {
- return;
- }
-
- for (TargetConfigUpdateListener listener : this.targetConfigUpdateListeners) {
- listener.onConfigUpdate(pusherTargetEntity);
- }
- }
-
-}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageService.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigObserver.java
similarity index 50%
rename from adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageService.java
rename to adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigObserver.java
index 88a2001..c6c7670 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageService.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigObserver.java
@@ -17,82 +17,31 @@
package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
-import io.openmessaging.connector.api.component.connector.Connector;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
-
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerListener;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
/**
* manage the pusher connector/task config info
*/
-public interface PusherConfigManageService {
-
- /**
- * Get all connector configs
- *
- * @return
- */
- Map<String, TargetKeyValue> getConnectorConfigs();
-
- /**
- * Put the configs.
- *
- * @param connectorName
- * @param configs
- * @return
- * @throws Exception
- */
- String putConnectTargetConfig(String connectorName, TargetKeyValue configs) throws Exception;
-
- /**
- * Remove the connector
- *
- * @param connectorName
- */
- void removeConnectorConfig(String connectorName);
-
- void recomputeTaskConfigs(String connectorName, Connector connector, Long currentTimestamp, TargetKeyValue configs);
+public interface TargetRunnerConfigObserver {
/**
- * Get all Task configs.
- *
+ * Get the target runner config of runtimer.
* @return
*/
- Map<String, List<TargetKeyValue>> getTaskConfigs();
-
- /**
- * get target info
- * @return
- */
- Set<PusherTargetEntity> getTargetInfo();
-
- /**
- * Get all topics
- * @return
- */
- List<String> getConnectTopics();
-
- /**
- * Persist all the configs in a store.
- */
- void persist();
+ Set<TargetRunnerConfig> getTargetRunnerConfig();
/**
* Register a listener to listen all config update operations.
*
* @param listener
*/
- void registerListener(TargetConfigUpdateListener listener);
-
- interface TargetConfigUpdateListener {
-
- /**
- * Invoke while connector config changed.
- */
- void onConfigUpdate(PusherTargetEntity targetEntity);
- }
+ void registerListener(TargetRunnerListener listener);
+ @Deprecated
+ Map<String, List<TargetKeyValue>> getTaskConfigs();
}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnControllerObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnControllerObserver.java
new file mode 100644
index 0000000..f51ac50
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnControllerObserver.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
+
+import com.google.common.collect.Sets;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+
+@Slf4j
+public class TargetRunnerConfigOnControllerObserver extends AbstractTargetRunnerConfigObserver {
+
+ @Override
+ public Set<TargetRunnerConfig> getLatestTargetRunnerConfig() {
+ return Sets.newHashSet();
+ }
+
+ public void add(TargetRunnerConfig targetRunnerConfig) {
+ super.onAddTargetRunner(targetRunnerConfig);
+ }
+
+ public void update(TargetRunnerConfig targetRunnerConfig) {
+ super.onUpdateTargetRunner(targetRunnerConfig);
+ }
+
+ public void delete(TargetRunnerConfig targetRunnerConfig) {
+ super.onDeleteTargetRunner(targetRunnerConfig);
+ }
+
+}
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnDBObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnDBObserver.java
new file mode 100644
index 0000000..3d847a1
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnDBObserver.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
+
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
+
+@Slf4j
+public class TargetRunnerConfigOnDBObserver extends AbstractTargetRunnerConfigObserver {
+
+ private static ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(
+ ThreadUtils.newThreadFactory("TargetRunnerConfigOnDBObserver", false));
+
+ public TargetRunnerConfigOnDBObserver(Plugin plugin) {
+ this.addListen(this);
+ }
+
+ @Override
+ public Set<TargetRunnerConfig> getLatestTargetRunnerConfig() {
+ return null;
+ }
+
+ public void addListen(
+ TargetRunnerConfigOnDBObserver pusherConfigOnFileService) {
+ service.scheduleAtFixedRate(() -> {
+ try {
+ Set<TargetRunnerConfig> latest = this.getLatestTargetRunnerConfig();
+ Set<TargetRunnerConfig> last = super.getTargetRunnerConfig();
+ TargetRunnerConfig changed = null;
+ super.onAddTargetRunner(changed);
+ super.onUpdateTargetRunner(changed);
+ super.onDeleteTargetRunner(changed);
+ } catch (Throwable e) {
+ log.error("Watch failed.", e);
+ }
+ }, 0, 3, TimeUnit.SECONDS);
+ }
+
+}
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
new file mode 100644
index 0000000..27269b8
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
+
+import java.nio.file.FileSystems;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardWatchEventKinds;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
+import org.springframework.beans.factory.annotation.Value;
+
+@Slf4j
+public class TargetRunnerConfigOnFileObserver extends AbstractTargetRunnerConfigObserver {
+
+ @Value("${runtimer.storePathRootDir:}")
+ private String storeRootPath;
+
+ @Value("${runtimer.store.targetRunner.config:targetRunner-config}")
+ private String fileName;
+
+ private static ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(
+ ThreadUtils.newThreadFactory("TargetRunnerConfigOnFileObserver", false));
+
+ public TargetRunnerConfigOnFileObserver(Plugin plugin) {
+ this.addListen(storeRootPath, fileName, this);
+ }
+
+ @Override
+ public Set<TargetRunnerConfig> getLatestTargetRunnerConfig() {
+ return null;
+ }
+
+ public void addListen(String pathStr, String fileName,
+ TargetRunnerConfigOnFileObserver pusherConfigOnFileService) {
+ log.info("Watching task file changing:{}", pathStr + fileName);
+ service.scheduleAtFixedRate(() -> {
+ try {
+ WatchService watchService = FileSystems.getDefault()
+ .newWatchService();
+ Path path = Paths.get(pathStr);
+ path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE,
+ StandardWatchEventKinds.ENTRY_MODIFY);
+ WatchKey watchKey;
+ while (true) {
+ watchKey = watchService.take();
+ if (watchKey != null && !watchKey.pollEvents()
+ .isEmpty()) {
+ log.info("Watched the file changed events.");
+ pusherConfigOnFileService.diff();
+ }
+ watchKey.reset();
+ }
+ } catch (Throwable e) {
+ log.error("Watch file failed.", e);
+ }
+ }, 0, 3, TimeUnit.SECONDS);
+ }
+
+ public void diff() {
+ Set<TargetRunnerConfig> latest = this.getLatestTargetRunnerConfig();
+ Set<TargetRunnerConfig> last = super.getTargetRunnerConfig();
+ TargetRunnerConfig changed = null;
+ super.onAddTargetRunner(changed);
+ super.onUpdateTargetRunner(changed);
+ super.onDeleteTargetRunner(changed);
+ }
+
+}
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/resources/target-runner.config b/adapter/runtimer/src/main/resources/target-runner.config
new file mode 100644
index 0000000..04458c6
--- /dev/null
+++ b/adapter/runtimer/src/main/resources/target-runner.config
@@ -0,0 +1,7 @@
+[
+ {
+ },
+ {
+
+ }
+]
\ No newline at end of file