You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by sh...@apache.org on 2023/03/15 09:52:06 UTC
[rocketmq-eventbridge] 01/01: feat:add TargetRunnerConfigObserver
This is an automated email from the ASF dual-hosted git repository.
shenlin pushed a commit to branch shenlin/runtimer
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git
commit 2a24698066e51ca6bdc74625a09228339d162ea1
Author: 2011shenlin <20...@gmail.com>
AuthorDate: Wed Mar 15 17:51:40 2023 +0800
feat:add TargetRunnerConfigObserver
---
.../RocketMQConnectTargetRunnerAPIImpl.java | 40 +--
.../eventbridge/adapter/runtimer/Runtimer.java | 24 +-
.../adapter/runtimer/boot/EventBusListener.java | 80 +++---
.../adapter/runtimer/boot/EventRuleTransfer.java | 82 ++++---
.../adapter/runtimer/boot/EventTargetPusher.java | 73 +++---
.../runtimer/boot/listener/ListenerFactory.java | 6 +-
.../boot/listener/TargetRunnerListener.java | 44 ++++
...erTargetEntity.java => TargetRunnerConfig.java} | 4 +-
.../AbstractTargetRunnerConfigObserver.java | 95 ++++++++
.../service/PusherConfigManageServiceImpl.java | 271 ---------------------
...ervice.java => TargetRunnerConfigObserver.java} | 69 +-----
.../TargetRunnerConfigOnControllerObserver.java | 48 ++++
.../service/TargetRunnerConfigOnDBObserver.java | 60 +++++
.../service/TargetRunnerConfigOnFileObserver.java | 92 +++++++
14 files changed, 517 insertions(+), 471 deletions(-)
diff --git a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/RocketMQConnectTargetRunnerAPIImpl.java b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/RocketMQConnectTargetRunnerAPIImpl.java
index fda982c..8971028 100644
--- a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/RocketMQConnectTargetRunnerAPIImpl.java
+++ b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/RocketMQConnectTargetRunnerAPIImpl.java
@@ -20,11 +20,8 @@ package org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
-
import java.util.ArrayList;
import java.util.Map;
-import java.util.Objects;
-
import lombok.SneakyThrows;
import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.context.RocketMQConnectSourceRunnerContext;
import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.context.RocketMQConnectTargetRunnerContext;
@@ -32,7 +29,6 @@ import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.dto.ActionStatus
import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.dto.CreateSinkConnectorRequest;
import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.dto.TransformRequest;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService;
import org.apache.rocketmq.eventbridge.domain.common.enums.EventTargetStatusEnum;
import org.apache.rocketmq.eventbridge.domain.model.Component;
import org.apache.rocketmq.eventbridge.domain.model.run.RunOptions;
@@ -45,13 +41,10 @@ public class RocketMQConnectTargetRunnerAPIImpl extends RocketMQConverter implem
private final RocketMQConnectClient rocketMQConnectClient;
- private PusherConfigManageService pusherConfigManageService;
-
public RocketMQConnectTargetRunnerAPIImpl(EventDataRepository eventDataRepository,
- RocketMQConnectClient rocketMQConnectClient, PusherConfigManageService pusherConfigManageService) {
+ RocketMQConnectClient rocketMQConnectClient) {
super(eventDataRepository);
this.rocketMQConnectClient = rocketMQConnectClient;
- this.pusherConfigManageService = pusherConfigManageService;
}
@SneakyThrows
@@ -64,14 +57,9 @@ public class RocketMQConnectTargetRunnerAPIImpl extends RocketMQConverter implem
TransformRequest filterTransform = this.buildEventBridgeFilterTransform(filterPattern);
TransformRequest eventBridgeTransform = this.buildEventBridgeTransform(targetTransform);
TargetKeyValue targetKeyValue = initSinkTaskConfig(name, topicName, sinkConnectorClass,
- sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
- if(Objects.nonNull(pusherConfigManageService)){
- pusherConfigManageService.putConnectTargetConfig(name, targetKeyValue);
- }else {
- // todo delete
- rocketMQConnectClient.createSinkConnector(name, topicName, sinkConnectorClass,
- sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
- }
+ sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
+ rocketMQConnectClient.createSinkConnector(name, topicName, sinkConnectorClass,
+ sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
RocketMQConnectTargetRunnerContext context = new RocketMQConnectTargetRunnerContext(name, JSON.toJSONString(targetKeyValue));
return new Gson().toJson(context);
}
@@ -87,17 +75,11 @@ public class RocketMQConnectTargetRunnerAPIImpl extends RocketMQConverter implem
TransformRequest eventBridgeTransform = this.buildEventBridgeTransform(targetTransform);
//create
TargetKeyValue targetKeyValue = initSinkTaskConfig(name, topicName, sinkConnectorClass,
- sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
- if(Objects.nonNull(pusherConfigManageService)){
- pusherConfigManageService.putConnectTargetConfig(name, targetKeyValue);
- }else {
- // todo delete
- //stop
- this.delete(runContext);
-
- rocketMQConnectClient.createSinkConnector(name, topicName, sinkConnectorClass,
- sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
- }
+ sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
+ //stop
+ this.delete(runContext);
+ rocketMQConnectClient.createSinkConnector(name, topicName, sinkConnectorClass,
+ sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform));
RocketMQConnectTargetRunnerContext context = new RocketMQConnectTargetRunnerContext(name, JSON.toJSONString(targetKeyValue));
return new Gson().toJson(context);
}
@@ -133,6 +115,7 @@ public class RocketMQConnectTargetRunnerAPIImpl extends RocketMQConverter implem
/**
* init sink task config
+ *
* @param name
* @param topicName
* @param sinkClass
@@ -140,7 +123,8 @@ public class RocketMQConnectTargetRunnerAPIImpl extends RocketMQConverter implem
* @param transforms
* @return
*/
- private TargetKeyValue initSinkTaskConfig(String name, String topicName, String sinkClass, Map<String, Object> sinkConfig, ArrayList<TransformRequest> transforms) {
+ private TargetKeyValue initSinkTaskConfig(String name, String topicName, String sinkClass,
+ Map<String, Object> sinkConfig, ArrayList<TransformRequest> transforms) {
CreateSinkConnectorRequest request = new CreateSinkConnectorRequest();
request.setName(name);
request.setTopicName(topicName);
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
index 896885d..44745d2 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
@@ -25,7 +25,7 @@ import org.apache.rocketmq.eventbridge.adapter.runtimer.common.RuntimerState;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@@ -55,7 +55,7 @@ public class Runtimer extends ServiceThread{
private ListenerFactory listenerFactory;
- private PusherConfigManageService pusherConfigManageService;
+ private TargetRunnerConfigObserver targetRunnerConfigObserver;
private Map<String, List<TargetKeyValue>> taskConfigs = new HashMap<>();
@@ -67,21 +67,21 @@ public class Runtimer extends ServiceThread{
private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor((Runnable r) -> new Thread(r, "RuntimerScheduledThread"));
- public Runtimer(Plugin plugin, ListenerFactory listenerFactory, PusherConfigManageService configManageService) {
+ public Runtimer(Plugin plugin, ListenerFactory listenerFactory, TargetRunnerConfigObserver configManageService) {
this.plugin = plugin;
this.listenerFactory = listenerFactory;
- this.pusherConfigManageService = configManageService;
+ this.targetRunnerConfigObserver = configManageService;
}
@PostConstruct
public void initAndStart() {
logger.info("init runtimer task config");
- this.taskConfigs = pusherConfigManageService.getTaskConfigs();
- listener = new EventBusListener(listenerFactory, pusherConfigManageService);
+ this.taskConfigs = targetRunnerConfigObserver.getTaskConfigs();
+ listener = new EventBusListener(listenerFactory, targetRunnerConfigObserver);
listener.initOrUpdateListenConsumer(taskConfigs);
- transfer = new EventRuleTransfer(plugin, listenerFactory, pusherConfigManageService);
+ transfer = new EventRuleTransfer(plugin, listenerFactory, targetRunnerConfigObserver);
transfer.initOrUpdateTaskTransform(taskConfigs);
- pusher = new EventTargetPusher(plugin, listenerFactory, pusherConfigManageService);
+ pusher = new EventTargetPusher(plugin, listenerFactory, targetRunnerConfigObserver);
pusher.initOrUpdatePusherTask(taskConfigs);
startRuntimer();
}
@@ -105,13 +105,5 @@ public class Runtimer extends ServiceThread{
pusher.start();
- scheduledExecutorService.scheduleAtFixedRate(() -> {
- try {
- this.pusherConfigManageService.persist();
- } catch (Exception e) {
- logger.error("schedule persist config error.", e);
- }
- }, 500, 500, TimeUnit.MILLISECONDS);
-
}
}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
index 0c40235..8f3c707 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
@@ -25,31 +25,43 @@ import io.openmessaging.connector.api.data.RecordOffset;
import io.openmessaging.connector.api.data.RecordPartition;
import io.openmessaging.connector.api.data.Schema;
import io.openmessaging.internal.DefaultKeyValue;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerListener;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.QueueState;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
-import java.nio.charset.StandardCharsets;
-import java.util.*;
-import java.util.concurrent.*;
-
/**
* listen the event and offer to queue
+ *
* @author artisan
*/
-public class EventBusListener extends ServiceThread {
+public class EventBusListener extends ServiceThread implements TargetRunnerListener {
private Logger logger = LoggerFactory.getLogger(EventBusListener.class);
@@ -63,32 +75,33 @@ public class EventBusListener extends ServiceThread {
private ListenerFactory listenerFactory;
- private PusherConfigManageService pusherConfigManageService;
+ private TargetRunnerConfigObserver targetRunnerConfigObserver;
- private ExecutorService executorService = new ThreadPoolExecutor(20,60, 1000,TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(100));
+ private ExecutorService executorService = new ThreadPoolExecutor(20, 60, 1000, TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(100));
private BlockingQueue<MessageExt> eventMessage = new LinkedBlockingQueue(50000);
- public EventBusListener(ListenerFactory listenerFactory, PusherConfigManageService pusherConfigManageService){
+ public EventBusListener(ListenerFactory listenerFactory, TargetRunnerConfigObserver targetRunnerConfigObserver) {
this.messageQueuesOffsetMap = new ConcurrentHashMap<>(256);
this.messageQueuesStateMap = new ConcurrentHashMap<>(256);
this.listenerFactory = listenerFactory;
- this.pusherConfigManageService = pusherConfigManageService;
- this.pusherConfigManageService.registerListener(new ConsumerUpdateListenerImpl());
+ this.targetRunnerConfigObserver = targetRunnerConfigObserver;
+ this.targetRunnerConfigObserver.registerListener(this);
}
/**
* init listen consumer
+ *
* @param taskConfig
*/
- public void initOrUpdateListenConsumer(Map<String, List<TargetKeyValue>> taskConfig){
- if(MapUtils.isEmpty(taskConfig)){
+ public void initOrUpdateListenConsumer(Map<String, List<TargetKeyValue>> taskConfig) {
+ if (MapUtils.isEmpty(taskConfig)) {
logger.warn("initListenConsumer by taskConfig param is empty");
return;
}
List<TargetKeyValue> targetKeyValues = initTaskKeyInfo(taskConfig);
this.topics.addAll(listenerFactory.parseTopicListByList(targetKeyValues));
- for (String topic : topics){
+ for (String topic : topics) {
DefaultLitePullConsumer pullConsumer = listenerFactory.initDefaultMQPullConsumer(topic);
listenConsumer.add(pullConsumer);
}
@@ -97,12 +110,13 @@ public class EventBusListener extends ServiceThread {
/**
* init all task config info
+ *
* @param taskConfig
* @return
*/
private List<TargetKeyValue> initTaskKeyInfo(Map<String, List<TargetKeyValue>> taskConfig) {
Set<TargetKeyValue> targetKeyValues = new HashSet<>();
- for(String connectName : taskConfig.keySet()){
+ for (String connectName : taskConfig.keySet()) {
targetKeyValues.addAll(taskConfig.get(connectName));
}
return Lists.newArrayList(targetKeyValues);
@@ -110,13 +124,13 @@ public class EventBusListener extends ServiceThread {
@Override
public void run() {
- while (!stopped){
- if(CollectionUtils.isEmpty(listenConsumer)){
+ while (!stopped) {
+ if (CollectionUtils.isEmpty(listenConsumer)) {
logger.info("current listen consumer is empty");
this.waitForRunning(1000);
continue;
}
- for(DefaultLitePullConsumer pullConsumer : listenConsumer) {
+ for (DefaultLitePullConsumer pullConsumer : listenConsumer) {
executorService.submit(() -> {
try {
List<MessageExt> messageExts = pullConsumer.poll(3000);
@@ -144,6 +158,7 @@ public class EventBusListener extends ServiceThread {
/**
* MessageExt convert to connect record
+ *
* @param messageExt
* @return
*/
@@ -175,14 +190,21 @@ public class EventBusListener extends ServiceThread {
/**
* consumer update listener
*/
- class ConsumerUpdateListenerImpl implements PusherConfigManageService.TargetConfigUpdateListener {
-
- @Override
- public void onConfigUpdate(PusherTargetEntity targetEntity) {
- logger.info("consumer update by new target config changed, target info -{}", JSON.toJSONString(targetEntity));
- Map<String, List<TargetKeyValue>> lastTargetMap = new HashMap<>();
- lastTargetMap.put(targetEntity.getConnectName(), targetEntity.getTargetKeyValues());
- initOrUpdateListenConsumer(lastTargetMap);
- }
+ @Override
+ public void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+ logger.info("consumer update by new target config changed, target info -{}", JSON.toJSONString(targetRunnerConfig));
+ Map<String, List<TargetKeyValue>> lastTargetMap = new HashMap<>();
+ lastTargetMap.put(targetRunnerConfig.getConnectName(), targetRunnerConfig.getTargetKeyValues());
+ initOrUpdateListenConsumer(lastTargetMap);
}
+
+ @Override
+ public void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+ }
+
+ @Override
+ public void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+
+ }
+
}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
index f14fdec..be73b2a 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
@@ -19,44 +19,54 @@ package org.apache.rocketmq.eventbridge.adapter.runtimer.boot;
import com.alibaba.fastjson.JSON;
import io.openmessaging.connector.api.data.ConnectRecord;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerListener;
import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.transfer.TransformEngine;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.*;
-import java.util.concurrent.*;
-
/**
* receive event and transfer the rule to pusher
*/
-public class EventRuleTransfer extends ServiceThread {
+public class EventRuleTransfer extends ServiceThread implements TargetRunnerListener {
private static final Logger logger = LoggerFactory.getLogger(EventRuleTransfer.class);
private ListenerFactory listenerFactory;
- private PusherConfigManageService pusherConfigManageService;
+ private TargetRunnerConfigObserver targetRunnerConfigObserver;
private Plugin plugin;
Map<TargetKeyValue/*taskConfig*/, TransformEngine<ConnectRecord>/*taskTransform*/> taskTransformMap = new ConcurrentHashMap<>(20);
- private ExecutorService executorService = new ThreadPoolExecutor(20,60, 1000,TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(100));
+ private ExecutorService executorService = new ThreadPoolExecutor(20, 60, 1000, TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(100));
- public EventRuleTransfer(Plugin plugin, ListenerFactory listenerFactory, PusherConfigManageService pusherConfigManageService){
+ public EventRuleTransfer(Plugin plugin, ListenerFactory listenerFactory,
+ TargetRunnerConfigObserver targetRunnerConfigObserver) {
this.plugin = plugin;
this.listenerFactory = listenerFactory;
- this.pusherConfigManageService = pusherConfigManageService;
- this.pusherConfigManageService.registerListener(new TransformUpdateListenerImpl());
+ this.targetRunnerConfigObserver = targetRunnerConfigObserver;
+ this.targetRunnerConfigObserver.registerListener(this);
}
- public void initOrUpdateTaskTransform(Map<String, List<TargetKeyValue>> taskConfig){
+ public void initOrUpdateTaskTransform(Map<String, List<TargetKeyValue>> taskConfig) {
this.taskTransformMap.putAll(initSinkTaskTransformInfo(taskConfig));
}
@@ -67,9 +77,9 @@ public class EventRuleTransfer extends ServiceThread {
@Override
public void run() {
- while (!stopped){
+ while (!stopped) {
ConnectRecord eventRecord = listenerFactory.takeEventRecord();
- if(Objects.isNull(eventRecord)){
+ if (Objects.isNull(eventRecord)) {
logger.info("listen eventRecord is empty, continue by curTime - {}", System.currentTimeMillis());
this.waitForRunning(1000);
continue;
@@ -77,16 +87,16 @@ public class EventRuleTransfer extends ServiceThread {
executorService.submit(() -> {
// extension add sub
// rule - target
- for (TargetKeyValue targetKeyValue : taskTransformMap.keySet()){
+ for (TargetKeyValue targetKeyValue : taskTransformMap.keySet()) {
// add threadPool for cup task
// attention coreSize
TransformEngine<ConnectRecord> transformEngine = taskTransformMap.get(targetKeyValue);
ConnectRecord transformRecord = transformEngine.doTransforms(eventRecord);
- if(Objects.isNull(transformRecord)){
+ if (Objects.isNull(transformRecord)) {
continue;
}
// a bean for maintain
- Map<TargetKeyValue,ConnectRecord> targetMap = new HashMap<>();
+ Map<TargetKeyValue, ConnectRecord> targetMap = new HashMap<>();
targetMap.put(targetKeyValue, transformRecord);
listenerFactory.offerTargetTaskQueue(targetMap);
@@ -101,20 +111,20 @@ public class EventRuleTransfer extends ServiceThread {
}
/**
- * Init sink task transform map
- * key: task config
- * value: transformEngine
+ * Init sink task transform map key: task config value: transformEngine
+ *
* @param taskConfig
* @return
*/
- private Map<TargetKeyValue, TransformEngine<ConnectRecord>> initSinkTaskTransformInfo(Map<String, List<TargetKeyValue>> taskConfig) {
+ private Map<TargetKeyValue, TransformEngine<ConnectRecord>> initSinkTaskTransformInfo(
+ Map<String, List<TargetKeyValue>> taskConfig) {
Map<TargetKeyValue, TransformEngine<ConnectRecord>> curTaskTransformMap = new HashMap<>();
Set<TargetKeyValue> allTaskKeySet = new HashSet<>();
- for(String connectName : taskConfig.keySet()){
+ for (String connectName : taskConfig.keySet()) {
List<TargetKeyValue> taskKeyList = taskConfig.get(connectName);
allTaskKeySet.addAll(new HashSet<>(taskKeyList));
}
- for(TargetKeyValue keyValue : allTaskKeySet){
+ for (TargetKeyValue keyValue : allTaskKeySet) {
TransformEngine<ConnectRecord> transformChain = new TransformEngine<>(keyValue, plugin);
curTaskTransformMap.put(keyValue, transformChain);
}
@@ -125,14 +135,20 @@ public class EventRuleTransfer extends ServiceThread {
/**
* transform update listener
*/
- class TransformUpdateListenerImpl implements PusherConfigManageService.TargetConfigUpdateListener {
-
- @Override
- public void onConfigUpdate(PusherTargetEntity targetEntity) {
- logger.info("transform update by new target config changed, target info -{}", JSON.toJSONString(targetEntity));
- Map<String, List<TargetKeyValue>> lastTargetMap = new HashMap<>();
- lastTargetMap.put(targetEntity.getConnectName(), targetEntity.getTargetKeyValues());
- initOrUpdateTaskTransform(lastTargetMap);
- }
+ @Override
+ public void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+ logger.info("transform update by new target config changed, target info -{}", JSON.toJSONString(targetRunnerConfig));
+ Map<String, List<TargetKeyValue>> lastTargetMap = new HashMap<>();
+ lastTargetMap.put(targetRunnerConfig.getConnectName(), targetRunnerConfig.getTargetKeyValues());
+ initOrUpdateTaskTransform(lastTargetMap);
+ }
+
+ @Override
+ public void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+ }
+
+ @Override
+ public void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+
}
}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
index d324fe4..bd9ddb0 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
@@ -22,27 +22,32 @@ import com.google.common.collect.Lists;
import io.netty.util.internal.ConcurrentSet;
import io.openmessaging.connector.api.component.task.sink.SinkTask;
import io.openmessaging.connector.api.data.ConnectRecord;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.collections.MapUtils;
import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerListener;
import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.pusher.PusherTaskContext;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.PluginClassLoader;
import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.*;
-import java.util.concurrent.CopyOnWriteArrayList;
-
/**
* event target push to sink task
+ *
* @author artisan
*/
-public class EventTargetPusher extends ServiceThread {
+public class EventTargetPusher extends ServiceThread implements TargetRunnerListener {
private static final Logger logger = LoggerFactory.getLogger(EventTargetPusher.class);
@@ -56,29 +61,31 @@ public class EventTargetPusher extends ServiceThread {
private ListenerFactory listenerFactory;
- private PusherConfigManageService pusherConfigManageService;
+ private TargetRunnerConfigObserver targetRunnerConfigObserver;
private List<SinkTask> pusherTasks = new CopyOnWriteArrayList<>();
- public EventTargetPusher(Plugin plugin, ListenerFactory listenerFactory, PusherConfigManageService pusherConfigManageService){
+ public EventTargetPusher(Plugin plugin, ListenerFactory listenerFactory,
+ TargetRunnerConfigObserver targetRunnerConfigObserver) {
this.plugin = plugin;
this.listenerFactory = listenerFactory;
- this.pusherConfigManageService = pusherConfigManageService;
- this.pusherConfigManageService.registerListener(new TargetUpdateListenerImpl());
+ this.targetRunnerConfigObserver = targetRunnerConfigObserver;
+ this.targetRunnerConfigObserver.registerListener(this);
}
/**
* init running tasks
+ *
* @param taskConfig
*/
- public void initOrUpdatePusherTask(Map<String, List<TargetKeyValue>> taskConfig){
+ public void initOrUpdatePusherTask(Map<String, List<TargetKeyValue>> taskConfig) {
Set<TargetKeyValue> taskProperty = new HashSet<>();
- for(String connectName : taskConfig.keySet()){
+ for (String connectName : taskConfig.keySet()) {
List<TargetKeyValue> targetKeyValues = taskConfig.get(connectName);
taskProperty.addAll(new HashSet<>(targetKeyValues));
}
- for(TargetKeyValue targetKeyValue : taskProperty){
- try{
+ for (TargetKeyValue targetKeyValue : taskProperty) {
+ try {
String taskClass = targetKeyValue.getString(RuntimerConfigDefine.TASK_CLASS);
ClassLoader loader = plugin.getPluginClassLoader(taskClass);
Class taskClazz;
@@ -98,7 +105,7 @@ public class EventTargetPusher extends ServiceThread {
Plugin.compareAndSwapLoaders(loader);
}
logger.info("init target task succeed, target key - {}", JSON.toJSONString(targetKeyValue));
- }catch (Exception exception){
+ } catch (Exception exception) {
exception.printStackTrace();
}
}
@@ -106,9 +113,9 @@ public class EventTargetPusher extends ServiceThread {
@Override
public void run() {
- while (!stopped){
+ while (!stopped) {
Map<TargetKeyValue, ConnectRecord> taskPusher = listenerFactory.takeTargetMap();
- if(MapUtils.isEmpty(taskPusher)){
+ if (MapUtils.isEmpty(taskPusher)) {
logger.info("current target pusher is empty");
this.waitForRunning(1000);
continue;
@@ -121,8 +128,8 @@ public class EventTargetPusher extends ServiceThread {
// also add in ConnectRecord class system property
String taskPushName = targetKeyValue.getString(RuntimerConfigDefine.TASK_CLASS);
// add thread pool
- for(SinkTask sinkTask : pusherTasks){
- if(sinkTask.getClass().getName().equals(taskPushName)){
+ for (SinkTask sinkTask : pusherTasks) {
+ if (sinkTask.getClass().getName().equals(taskPushName)) {
sinkTask.put(Lists.newArrayList(taskPusher.get(targetKeyValue)));
}
}
@@ -137,14 +144,22 @@ public class EventTargetPusher extends ServiceThread {
/**
* target update listener
*/
- class TargetUpdateListenerImpl implements PusherConfigManageService.TargetConfigUpdateListener {
-
- @Override
- public void onConfigUpdate(PusherTargetEntity targetEntity) {
- logger.info("transform update by new target config changed, target info -{}", JSON.toJSONString(targetEntity));
- Map<String, List<TargetKeyValue>> lastTargetMap = new HashMap<>();
- lastTargetMap.put(targetEntity.getConnectName(), targetEntity.getTargetKeyValues());
- initOrUpdatePusherTask(lastTargetMap);
- }
+
+ @Override
+ public void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+ logger.info("transform update by new target config changed, target info -{}", JSON.toJSONString(targetRunnerConfig));
+ Map<String, List<TargetKeyValue>> lastTargetMap = new HashMap<>();
+ lastTargetMap.put(targetRunnerConfig.getConnectName(), targetRunnerConfig.getTargetKeyValues());
+ initOrUpdatePusherTask(lastTargetMap);
}
+
+ @Override
+ public void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+ }
+
+ @Override
+ public void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+
+ }
+
}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java
index 07f010e..44c29bb 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java
@@ -28,7 +28,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.LoggerName;
import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
@@ -52,7 +52,7 @@ public class ListenerFactory {
public static final String QUEUE_OFFSET = "queueOffset";
- private BlockingQueue<PusherTargetEntity> pusherTargetQueue = new LinkedBlockingQueue<>(1000);
+ private BlockingQueue<TargetRunnerConfig> pusherTargetQueue = new LinkedBlockingQueue<>(1000);
private BlockingQueue<MessageExt> eventMessage = new LinkedBlockingQueue(50000);
@@ -76,7 +76,7 @@ public class ListenerFactory {
return consumer;
}
- public PusherTargetEntity takeTaskConfig(){
+ public TargetRunnerConfig takeTaskConfig(){
if(pusherTargetQueue.isEmpty()){
return null;
}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/TargetRunnerListener.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/TargetRunnerListener.java
new file mode 100644
index 0000000..f528b7b
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/TargetRunnerListener.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener;
+
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+
+public interface TargetRunnerListener {
+
+ /**
+ * Call when add new target runner to runtimer.
+ *
+ * @param targetRunnerConfig
+ */
+ void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig);
+
+ /**
+ * Call when the old target runner updated.
+ *
+ * @param targetRunnerConfig
+ */
+ void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig);
+
+ /**
+ * Call when the old target runner deleted from runtimer.
+ *
+ * @param targetRunnerConfig
+ */
+ void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig);
+
+}
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/PusherTargetEntity.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
similarity index 92%
rename from adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/PusherTargetEntity.java
rename to adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
index dc7ced4..52525cf 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/PusherTargetEntity.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
@@ -26,7 +26,7 @@ import java.util.List;
* pusher target key config
*/
@Data
-public class PusherTargetEntity implements Serializable {
+public class TargetRunnerConfig implements Serializable {
private String connectName;
@@ -35,7 +35,7 @@ public class PusherTargetEntity implements Serializable {
@Override
public boolean equals(Object object){
if (object != null && object.getClass() == this.getClass()) {
- PusherTargetEntity targetEntity = (PusherTargetEntity) object;
+ TargetRunnerConfig targetEntity = (TargetRunnerConfig) object;
return this.connectName.equals(targetEntity.getConnectName())
&& this.targetKeyValues.size() == targetEntity.getTargetKeyValues().size()
&& this.targetKeyValues.containsAll(targetEntity.getTargetKeyValues());
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/AbstractTargetRunnerConfigObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/AbstractTargetRunnerConfigObserver.java
new file mode 100644
index 0000000..619bc63
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/AbstractTargetRunnerConfigObserver.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
+
+import com.google.common.collect.Sets;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerListener;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
+
+@Service
+public abstract class AbstractTargetRunnerConfigObserver implements TargetRunnerConfigObserver {
+
+ private Set<TargetRunnerConfig> targetRunnerConfigs = Sets.newHashSet();
+
+ /**
+ * All listeners to trigger while config change.
+ */
+ private Set<TargetRunnerListener> targetRunnerConfigListeners = new HashSet<>();
+
+ public Set<TargetRunnerConfig> getTargetRunnerConfig() {
+ return targetRunnerConfigs;
+ }
+
+ public abstract Set<TargetRunnerConfig> getLatestTargetRunnerConfig();
+
+ @Override
+ public void registerListener(TargetRunnerListener listener) {
+ this.targetRunnerConfigListeners.add(listener);
+ }
+
+ void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+ this.targetRunnerConfigs.add(targetRunnerConfig);
+ if (CollectionUtils.isEmpty(this.targetRunnerConfigListeners)) {
+ return;
+ }
+ for (TargetRunnerListener listener : this.targetRunnerConfigListeners) {
+ listener.onAddTargetRunner(targetRunnerConfig);
+ }
+ }
+
+ /**
+ * Call when the old target runner updated.
+ *
+ * @param targetRunnerConfig
+ */
+ void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+ this.targetRunnerConfigs.add(targetRunnerConfig);
+ if (CollectionUtils.isEmpty(this.targetRunnerConfigListeners)) {
+ return;
+ }
+ for (TargetRunnerListener listener : this.targetRunnerConfigListeners) {
+ listener.onUpdateTargetRunner(targetRunnerConfig);
+ }
+ }
+
+ /**
+ * Call when the old target runner deleted from runtimer.
+ *
+ * @param targetRunnerConfig
+ */
+ void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+ this.targetRunnerConfigs.remove(targetRunnerConfig);
+ if (CollectionUtils.isEmpty(this.targetRunnerConfigListeners)) {
+ return;
+ }
+ for (TargetRunnerListener listener : this.targetRunnerConfigListeners) {
+ listener.onDeleteTargetRunner(targetRunnerConfig);
+ }
+ }
+
+ public Map<String, List<TargetKeyValue>> getTaskConfigs() {
+ return null;
+ }
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageServiceImpl.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageServiceImpl.java
deleted file mode 100644
index 58a66fd..0000000
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageServiceImpl.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
-
-import com.google.common.collect.Lists;
-import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.component.connector.Connector;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.FilePathConfigUtil;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.store.FileBaseKeyValueStore;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.store.KeyValueStore;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.converter.JsonConverter;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.converter.ListConverter;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Service;
-import org.springframework.util.CollectionUtils;
-
-import javax.annotation.PostConstruct;
-import java.util.*;
-import java.util.concurrent.CopyOnWriteArraySet;
-
-@Service
-public class PusherConfigManageServiceImpl implements PusherConfigManageService {
-
- /**
- * plugin for recognize class loader
- */
- private Plugin plugin;
-
- /**
- * Current connector configs in the store.
- */
- private KeyValueStore<String, TargetKeyValue> connectorKeyValueStore;
-
- /**
- * Current task configs in the store.
- */
- private KeyValueStore<String, List<TargetKeyValue>> taskKeyValueStore;
-
- /**
- * All listeners to trigger while config change.
- */
- private Set<TargetConfigUpdateListener> targetConfigUpdateListeners;
-
- private Set<String> connectTopicNames;
-
- @Value("${runtimer.storePathRootDir:}")
- private String storeRootPath;
-
- public PusherConfigManageServiceImpl(Plugin plugin){
- this.plugin = plugin;
- this.connectTopicNames = new CopyOnWriteArraySet<>();
- this.targetConfigUpdateListeners = new HashSet<>();
- }
-
- @PostConstruct
- public void initStoreKeyValue(){
- this.connectorKeyValueStore = new FileBaseKeyValueStore<>(
- FilePathConfigUtil.getConnectorConfigPath(this.storeRootPath),
- new JsonConverter(),
- new JsonConverter(TargetKeyValue.class));
- this.taskKeyValueStore = new FileBaseKeyValueStore<>(
- FilePathConfigUtil.getTaskConfigPath(this.storeRootPath),
- new JsonConverter(),
- new ListConverter(TargetKeyValue.class));
- this.connectorKeyValueStore.load();
- this.taskKeyValueStore.load();
- }
-
- /**
- * get all connector configs enabled
- *
- * @return
- */
- @Override
- public Map<String, TargetKeyValue> getConnectorConfigs() {
- Map<String, TargetKeyValue> result = new HashMap<>();
- Map<String, TargetKeyValue> connectorConfigs = connectorKeyValueStore.getKVMap();
- for (String connectorName : connectorConfigs.keySet()) {
- TargetKeyValue config = connectorConfigs.get(connectorName);
- if (0 != config.getInt(RuntimerConfigDefine.CONFIG_DELETED)) {
- continue;
- }
- result.put(connectorName, config);
- }
- return result;
- }
-
- @Override
- public String putConnectTargetConfig(String connectorName, TargetKeyValue configs) throws Exception {
- TargetKeyValue exist = connectorKeyValueStore.get(connectorName);
- if (null != exist) {
- Long updateTimestamp = exist.getLong(RuntimerConfigDefine.UPDATE_TIMESTAMP);
- if (null != updateTimestamp) {
- configs.put(RuntimerConfigDefine.UPDATE_TIMESTAMP, updateTimestamp);
- }
- }
- if (configs.equals(exist)) {
- return "Connector with same config already exist.";
- }
-
- Long currentTimestamp = System.currentTimeMillis();
- configs.put(RuntimerConfigDefine.UPDATE_TIMESTAMP, currentTimestamp);
- for (String requireConfig : RuntimerConfigDefine.REQUEST_CONFIG) {
- if (!configs.containsKey(requireConfig)) {
- return "Request config key: " + requireConfig;
- }
- }
-
- String connectorClass = configs.getString(RuntimerConfigDefine.CONNECTOR_CLASS);
- ClassLoader classLoader = plugin.getPluginClassLoader(connectorClass);
- Class clazz;
- if (null != classLoader) {
- clazz = Class.forName(connectorClass, true, classLoader);
- } else {
- clazz = Class.forName(connectorClass);
- }
- final Connector connector = (Connector) clazz.getDeclaredConstructor().newInstance();
- connector.validate(configs);
- connector.init(configs);
- connectorKeyValueStore.put(connectorName, configs);
- recomputeTaskConfigs(connectorName, connector, currentTimestamp, configs);
- return "";
- }
-
- @Override
- public void recomputeTaskConfigs(String connectorName, Connector connector, Long currentTimestamp, TargetKeyValue configs) {
- int maxTask = configs.getInt(RuntimerConfigDefine.MAX_TASK, 1);
- List<KeyValue> taskConfigs = connector.taskConfigs(maxTask);
- List<TargetKeyValue> converterdConfigs = new ArrayList<>();
- for (KeyValue keyValue : taskConfigs) {
- TargetKeyValue newKeyValue = new TargetKeyValue();
- for (String key : keyValue.keySet()) {
- newKeyValue.put(key, keyValue.getString(key));
- }
- newKeyValue.put(RuntimerConfigDefine.TASK_CLASS, connector.taskClass().getName());
- newKeyValue.put(RuntimerConfigDefine.UPDATE_TIMESTAMP, currentTimestamp);
-
- newKeyValue.put(RuntimerConfigDefine.CONNECT_TOPICNAME, configs.getString(RuntimerConfigDefine.CONNECT_TOPICNAME));
- newKeyValue.put(RuntimerConfigDefine.CONNECT_TOPICNAMES, configs.getString(RuntimerConfigDefine.CONNECT_TOPICNAMES));
- Set<String> connectConfigKeySet = configs.keySet();
- for (String connectConfigKey : connectConfigKeySet) {
- if (connectConfigKey.startsWith(RuntimerConfigDefine.TRANSFORMS)) {
- newKeyValue.put(connectConfigKey, configs.getString(connectConfigKey));
- }
- }
- converterdConfigs.add(newKeyValue);
- connectTopicNames.add(configs.getString(RuntimerConfigDefine.CONNECT_TOPICNAME));
- }
- putTaskConfigs(connectorName, converterdConfigs);
- }
-
- @Override
- public void removeConnectorConfig(String connectorName) {
- TargetKeyValue config = connectorKeyValueStore.get(connectorName);
- if(Objects.isNull(config)){
- return;
- }
- config.put(RuntimerConfigDefine.UPDATE_TIMESTAMP, System.currentTimeMillis());
- config.put(RuntimerConfigDefine.CONFIG_DELETED, 1);
- List<TargetKeyValue> taskConfigList = taskKeyValueStore.get(connectorName);
- taskConfigList.add(config);
- connectorKeyValueStore.put(connectorName, config);
- putTaskConfigs(connectorName, taskConfigList);
- }
-
- @Override
- public Map<String, List<TargetKeyValue>> getTaskConfigs() {
- Map<String, List<TargetKeyValue>> result = new HashMap<>();
- Map<String, List<TargetKeyValue>> taskConfigs = taskKeyValueStore.getKVMap();
- Map<String, TargetKeyValue> filteredConnector = getConnectorConfigs();
- for (String connectorName : taskConfigs.keySet()) {
- if (!filteredConnector.containsKey(connectorName)) {
- continue;
- }
- result.put(connectorName, taskConfigs.get(connectorName));
- }
- return result;
- }
-
- @Override
- public Set<PusherTargetEntity> getTargetInfo() {
- Set<PusherTargetEntity> result = new HashSet<>();
- Map<String, List<TargetKeyValue>> taskConfigs = taskKeyValueStore.getKVMap();
- Map<String, TargetKeyValue> filteredConnector = getConnectorConfigs();
- for (String connectorName : taskConfigs.keySet()) {
- if (!filteredConnector.containsKey(connectorName)) {
- continue;
- }
- PusherTargetEntity targetEntity = new PusherTargetEntity();
- targetEntity.setConnectName(connectorName);
- targetEntity.setTargetKeyValues(taskConfigs.get(connectorName));
- result.add(targetEntity);
- }
- return result;
- }
-
- @Override
- public List<String> getConnectTopics(){
- if(CollectionUtils.isEmpty(connectTopicNames)){
- return Lists.newArrayList();
- }
- return Lists.newArrayList(connectTopicNames);
- }
-
- @Override
- public void persist() {
- this.connectorKeyValueStore.persist();
- this.taskKeyValueStore.persist();
- }
-
- @Override
- public void registerListener(TargetConfigUpdateListener listener) {
- this.targetConfigUpdateListeners.add(listener);
- }
-
- /**
- * put target task key config for update
- * @param connectorName
- * @param configs
- */
- private void putTaskConfigs(String connectorName, List<TargetKeyValue> configs) {
- List<TargetKeyValue> exist = taskKeyValueStore.get(connectorName);
- if (null != exist && exist.size() > 0) {
- taskKeyValueStore.remove(connectorName);
- }
- taskKeyValueStore.put(connectorName, configs);
- PusherTargetEntity targetEntity = new PusherTargetEntity();
- targetEntity.setConnectName(connectorName);
- targetEntity.setTargetKeyValues(configs);
- triggerListener(targetEntity);
- persistStore();
- }
-
- private void persistStore() {
-
- }
-
- /**
- * trigger new target task config for update
- * @param pusherTargetEntity
- */
- private void triggerListener(PusherTargetEntity pusherTargetEntity) {
- if (CollectionUtils.isEmpty(this.targetConfigUpdateListeners)) {
- return;
- }
-
- for (TargetConfigUpdateListener listener : this.targetConfigUpdateListeners) {
- listener.onConfigUpdate(pusherTargetEntity);
- }
- }
-
-}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageService.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigObserver.java
similarity index 50%
rename from adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageService.java
rename to adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigObserver.java
index 88a2001..c6c7670 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageService.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigObserver.java
@@ -17,82 +17,31 @@
package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
-import io.openmessaging.connector.api.component.connector.Connector;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
-
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerListener;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
/**
* manage the pusher connector/task config info
*/
-public interface PusherConfigManageService {
-
- /**
- * Get all connector configs
- *
- * @return
- */
- Map<String, TargetKeyValue> getConnectorConfigs();
-
- /**
- * Put the configs.
- *
- * @param connectorName
- * @param configs
- * @return
- * @throws Exception
- */
- String putConnectTargetConfig(String connectorName, TargetKeyValue configs) throws Exception;
-
- /**
- * Remove the connector
- *
- * @param connectorName
- */
- void removeConnectorConfig(String connectorName);
-
- void recomputeTaskConfigs(String connectorName, Connector connector, Long currentTimestamp, TargetKeyValue configs);
+public interface TargetRunnerConfigObserver {
/**
- * Get all Task configs.
- *
+ * Get the target runner config of runtimer.
* @return
*/
- Map<String, List<TargetKeyValue>> getTaskConfigs();
-
- /**
- * get target info
- * @return
- */
- Set<PusherTargetEntity> getTargetInfo();
-
- /**
- * Get all topics
- * @return
- */
- List<String> getConnectTopics();
-
- /**
- * Persist all the configs in a store.
- */
- void persist();
+ Set<TargetRunnerConfig> getTargetRunnerConfig();
/**
* Register a listener to listen all config update operations.
*
* @param listener
*/
- void registerListener(TargetConfigUpdateListener listener);
-
- interface TargetConfigUpdateListener {
-
- /**
- * Invoke while connector config changed.
- */
- void onConfigUpdate(PusherTargetEntity targetEntity);
- }
+ void registerListener(TargetRunnerListener listener);
+ @Deprecated
+ Map<String, List<TargetKeyValue>> getTaskConfigs();
}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnControllerObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnControllerObserver.java
new file mode 100644
index 0000000..f51ac50
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnControllerObserver.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
+
+import com.google.common.collect.Sets;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+
+@Slf4j
+public class TargetRunnerConfigOnControllerObserver extends AbstractTargetRunnerConfigObserver {
+
+ @Override
+ public Set<TargetRunnerConfig> getLatestTargetRunnerConfig() {
+ return Sets.newHashSet();
+ }
+
+ public void add(TargetRunnerConfig targetRunnerConfig) {
+ super.onAddTargetRunner(targetRunnerConfig);
+ }
+
+ public void update(TargetRunnerConfig targetRunnerConfig) {
+ super.onUpdateTargetRunner(targetRunnerConfig);
+ }
+
+ public void delete(TargetRunnerConfig targetRunnerConfig) {
+ super.onDeleteTargetRunner(targetRunnerConfig);
+ }
+
+}
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnDBObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnDBObserver.java
new file mode 100644
index 0000000..3d847a1
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnDBObserver.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
+
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
+
+@Slf4j
+public class TargetRunnerConfigOnDBObserver extends AbstractTargetRunnerConfigObserver {
+
+ private static ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(
+ ThreadUtils.newThreadFactory("TargetRunnerConfigOnDBObserver", false));
+
+ public TargetRunnerConfigOnDBObserver(Plugin plugin) {
+ this.addListen(this);
+ }
+
+ @Override
+ public Set<TargetRunnerConfig> getLatestTargetRunnerConfig() {
+ return null;
+ }
+
+ public void addListen(
+ TargetRunnerConfigOnDBObserver pusherConfigOnFileService) {
+ service.scheduleAtFixedRate(() -> {
+ try {
+ Set<TargetRunnerConfig> latest = this.getLatestTargetRunnerConfig();
+ Set<TargetRunnerConfig> last = super.getTargetRunnerConfig();
+ TargetRunnerConfig changed = null;
+ super.onAddTargetRunner(changed);
+ super.onUpdateTargetRunner(changed);
+ super.onDeleteTargetRunner(changed);
+ } catch (Throwable e) {
+ log.error("Watch failed.", e);
+ }
+ }, 0, 3, TimeUnit.SECONDS);
+ }
+
+}
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
new file mode 100644
index 0000000..27269b8
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
+
+import java.nio.file.FileSystems;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardWatchEventKinds;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
+import org.springframework.beans.factory.annotation.Value;
+
+@Slf4j
+public class TargetRunnerConfigOnFileObserver extends AbstractTargetRunnerConfigObserver {
+
+ @Value("${runtimer.storePathRootDir:}")
+ private String storeRootPath;
+
+ @Value("${runtimer.store.targetRunner.config:targetRunner-config}")
+ private String fileName;
+
+ private static ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(
+ ThreadUtils.newThreadFactory("TargetRunnerConfigOnFileObserver", false));
+
+ public TargetRunnerConfigOnFileObserver(Plugin plugin) {
+ this.addListen(storeRootPath, fileName, this);
+ }
+
+ @Override
+ public Set<TargetRunnerConfig> getLatestTargetRunnerConfig() {
+ return null;
+ }
+
+ public void addListen(String pathStr, String fileName,
+ TargetRunnerConfigOnFileObserver pusherConfigOnFileService) {
+ log.info("Watching task file changing:{}", pathStr + fileName);
+ service.scheduleAtFixedRate(() -> {
+ try {
+ WatchService watchService = FileSystems.getDefault()
+ .newWatchService();
+ Path path = Paths.get(pathStr);
+ path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE,
+ StandardWatchEventKinds.ENTRY_MODIFY);
+ WatchKey watchKey;
+ while (true) {
+ watchKey = watchService.take();
+ if (watchKey != null && !watchKey.pollEvents()
+ .isEmpty()) {
+ log.info("Watched the file changed events.");
+ pusherConfigOnFileService.diff();
+ }
+ watchKey.reset();
+ }
+ } catch (Throwable e) {
+ log.error("Watch file failed.", e);
+ }
+ }, 0, 3, TimeUnit.SECONDS);
+ }
+
+ public void diff() {
+ Set<TargetRunnerConfig> latest = this.getLatestTargetRunnerConfig();
+ Set<TargetRunnerConfig> last = super.getTargetRunnerConfig();
+ TargetRunnerConfig changed = null;
+ super.onAddTargetRunner(changed);
+ super.onUpdateTargetRunner(changed);
+ super.onDeleteTargetRunner(changed);
+ }
+
+}
\ No newline at end of file