You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by sh...@apache.org on 2023/03/31 03:34:46 UTC
[rocketmq-eventbridge] branch runtimer updated: Optimize Event Circulator Context (#70)
This is an automated email from the ASF dual-hosted git repository.
shenlin pushed a commit to branch runtimer
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git
The following commit(s) were added to refs/heads/runtimer by this push:
new 23529d4 Optimize Event Circulator Context (#70)
23529d4 is described below
commit 23529d43ac8baf91d281ec758e0b00e683cc58f0
Author: Artisan <Xi...@outlook.com>
AuthorDate: Fri Mar 31 11:34:41 2023 +0800
Optimize Event Circulator Context (#70)
Optimize Event Circulator Context
---
adapter/runtimer/pom.xml | 6 -
.../eventbridge/adapter/runtimer/Runtimer.java | 51 ++--
.../adapter/runtimer/boot/EventBusListener.java | 32 ++-
.../adapter/runtimer/boot/EventRuleTransfer.java | 36 +--
.../adapter/runtimer/boot/EventTargetPusher.java | 110 ++------
.../runtimer/boot/listener/CirculatorContext.java | 201 ++++++++++++++
.../runtimer/boot/listener/EventSubscriber.java | 18 +-
.../runtimer/boot/listener/ListenerFactory.java | 294 ---------------------
.../boot/listener/RocketMQEventSubscriber.java | 264 +++++++++++++++++-
.../boot/listener/TargetRunnerListener.java | 1 +
.../runtimer/common/entity/TargetKeyValue.java | 32 ++-
.../enums/RefreshTypeEnum.java} | 34 +--
.../runtimer/config/RuntimerConfigDefine.java | 2 +
.../service/TargetRunnerConfigOnFileObserver.java | 21 +-
.../src/main/resources/runtimer.properties | 23 ++
pom.xml | 1 +
16 files changed, 614 insertions(+), 512 deletions(-)
diff --git a/adapter/runtimer/pom.xml b/adapter/runtimer/pom.xml
index 49578b4..1e7c7ed 100644
--- a/adapter/runtimer/pom.xml
+++ b/adapter/runtimer/pom.xml
@@ -92,12 +92,6 @@
<version>RELEASE</version>
<scope>compile</scope>
</dependency>
- <dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter</artifactId>
- <version>RELEASE</version>
- <scope>compile</scope>
- </dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
index 682b830..f871e08 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
@@ -17,26 +17,22 @@
package org.apache.rocketmq.eventbridge.adapter.runtimer;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicReference;
-import javax.annotation.PostConstruct;
import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.EventBusListener;
import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.EventRuleTransfer;
import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.EventTargetPusher;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.EventSubscriber;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.CirculatorContext;
import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.RocketMQEventSubscriber;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.RuntimerState;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigObserver;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.service.AbstractTargetRunnerConfigObserver;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigOnFileObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
+import javax.annotation.PostConstruct;
+import java.util.concurrent.atomic.AtomicReference;
+
/**
* event bridge runtimer
*
@@ -49,34 +45,25 @@ public class Runtimer {
private AtomicReference<RuntimerState> runtimerState;
- private Plugin plugin;
-
- private ListenerFactory listenerFactory;
-
- private TargetRunnerConfigObserver targetRunnerConfigObserver;
-
- private Map<String, List<TargetKeyValue>> taskConfigs = new HashMap<>();
-
- private EventBusListener listener;
-
- private EventRuleTransfer transfer;
-
- private EventTargetPusher pusher;
+ private CirculatorContext circulatorContext;
- private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor((Runnable r) -> new Thread(r, "RuntimerScheduledThread"));
+ private AbstractTargetRunnerConfigObserver runnerConfigObserver;
- public Runtimer(Plugin plugin, ListenerFactory listenerFactory, TargetRunnerConfigObserver configManageService) {
- this.plugin = plugin;
- this.listenerFactory = listenerFactory;
- this.targetRunnerConfigObserver = configManageService;
+ public Runtimer(CirculatorContext circulatorContext) {
+ this.circulatorContext = circulatorContext;
+ this.runnerConfigObserver = new TargetRunnerConfigOnFileObserver();
}
@PostConstruct
public void initAndStart() {
logger.info("init runtimer task config");
- new EventBusListener(listenerFactory, new RocketMQEventSubscriber(listenerFactory)).start();
- new EventRuleTransfer(listenerFactory).start();
- new EventTargetPusher(listenerFactory).start();
+ circulatorContext.initListenerMetadata(runnerConfigObserver.getTargetRunnerConfig());
+ EventSubscriber eventSubscriber = new RocketMQEventSubscriber(runnerConfigObserver);
+ runnerConfigObserver.registerListener(circulatorContext);
+ runnerConfigObserver.registerListener(eventSubscriber);
+ new EventBusListener(circulatorContext, eventSubscriber).start();
+ new EventRuleTransfer(circulatorContext).start();
+ new EventTargetPusher(circulatorContext).start();
startRuntimer();
}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
index bc44668..d9227d0 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
@@ -18,10 +18,14 @@
package org.apache.rocketmq.eventbridge.adapter.runtimer.boot;
import io.openmessaging.connector.api.data.ConnectRecord;
-import java.util.List;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.EventSubscriber;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.CirculatorContext;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
/**
* listen the event and offer to queue
@@ -30,25 +34,35 @@ import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
*/
public class EventBusListener extends ServiceThread {
- private ListenerFactory listenerFactory;
+ private static final Logger logger = LoggerFactory.getLogger(EventBusListener.class);
+
+ private CirculatorContext circulatorContext;
private EventSubscriber eventSubscriber;
- public EventBusListener(ListenerFactory listenerFactory,
- EventSubscriber eventSubscriber) {
- this.listenerFactory = listenerFactory;
+ public EventBusListener(CirculatorContext circulatorContext, EventSubscriber eventSubscriber) {
+ this.circulatorContext = circulatorContext;
this.eventSubscriber = eventSubscriber;
}
@Override
public void run() {
while (!stopped) {
- List<ConnectRecord> recordList = eventSubscriber.pull();
- listenerFactory.offerEventRecords(recordList);
+ try{
+ List<ConnectRecord> recordList = eventSubscriber.pull();
+ if(CollectionUtils.isEmpty(recordList)){
+ this.waitForRunning(1000);
+ continue;
+ }
+ circulatorContext.offerEventRecords(recordList);
+ }catch (Exception exception) {
+ logger.error(getServiceName() + " - event bus pull record exception, stackTrace - ", exception);
+ }
}
}
- @Override public String getServiceName() {
+ @Override
+ public String getServiceName() {
return EventBusListener.class.getSimpleName();
}
}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
index eaee64c..f9492f6 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
@@ -19,22 +19,17 @@ package org.apache.rocketmq.eventbridge.adapter.runtimer.boot;
import com.alibaba.fastjson.JSON;
import io.openmessaging.connector.api.data.ConnectRecord;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.CirculatorContext;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.transfer.TransformEngine;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.Objects;
-import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.transfer.TransformEngine;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* receive event and transfer the rule to pusher
@@ -43,12 +38,12 @@ public class EventRuleTransfer extends ServiceThread {
private static final Logger logger = LoggerFactory.getLogger(EventRuleTransfer.class);
- private ListenerFactory listenerFactory;
+ private CirculatorContext circulatorContext;
private ExecutorService executorService = new ThreadPoolExecutor(20, 60, 1000, TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(100));
- public EventRuleTransfer(ListenerFactory listenerFactory) {
- this.listenerFactory = listenerFactory;
+ public EventRuleTransfer(CirculatorContext circulatorContext) {
+ this.circulatorContext = circulatorContext;
}
@Override
@@ -59,8 +54,8 @@ public class EventRuleTransfer extends ServiceThread {
@Override
public void run() {
while (!stopped) {
- //TODO JAVA8 并发流处理
- ConnectRecord eventRecord = listenerFactory.takeEventRecord();
+ // add CompletableFuture
+ ConnectRecord eventRecord = circulatorContext.takeEventRecord();
if (Objects.isNull(eventRecord)) {
logger.info("listen eventRecord is empty, continue by curTime - {}", System.currentTimeMillis());
this.waitForRunning(1000);
@@ -69,22 +64,17 @@ public class EventRuleTransfer extends ServiceThread {
executorService.submit(() -> {
// extension add sub
// rule - target
- listenerFactory.getTaskTransformMap().entrySet().forEach(entry -> {
+ circulatorContext.getTaskTransformMap().entrySet().forEach(entry -> {
TransformEngine<ConnectRecord> transformEngine = entry.getValue();
ConnectRecord transformRecord = transformEngine.doTransforms(eventRecord);
if (Objects.isNull(transformRecord)) {
return;
}
// a bean for maintain
- listenerFactory.offerTargetTaskQueue(transformRecord);
+ circulatorContext.offerTargetTaskQueue(transformRecord);
logger.debug("offer target task queue succeed, targetMap - {}", JSON.toJSONString(transformRecord));
});
-
-
});
}
}
-
-
-
}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
index 35c6bb2..9c17d2a 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
@@ -19,29 +19,17 @@ package org.apache.rocketmq.eventbridge.adapter.runtimer.boot;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
-import io.netty.util.internal.ConcurrentSet;
import io.openmessaging.connector.api.component.task.sink.SinkTask;
import io.openmessaging.connector.api.data.ConnectRecord;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
-import org.apache.commons.collections.MapUtils;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerListener;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.pusher.PusherTaskContext;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.CirculatorContext;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.PluginClassLoader;
import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
+import java.util.Objects;
+
/**
* event target push to sink task
*
@@ -51,84 +39,30 @@ public class EventTargetPusher extends ServiceThread{
private static final Logger logger = LoggerFactory.getLogger(EventTargetPusher.class);
- private Set<Runnable> runningTasks = new ConcurrentSet<>();
-
- private Set<Runnable> errorTasks = new ConcurrentSet<>();
-
- private Set<Runnable> stoppedTasks = new ConcurrentSet<>();
-
- private Plugin plugin;
-
- private ListenerFactory listenerFactory;
-
- private TargetRunnerConfigObserver targetRunnerConfigObserver;
-
- private List<SinkTask> pusherTasks = new CopyOnWriteArrayList<>();
+ private CirculatorContext circulatorContext;
- public EventTargetPusher(ListenerFactory listenerFactory) {
- this.listenerFactory = listenerFactory;
- }
-
- /**
- * init running tasks
- *
- * @param taskConfig
- */
- public void initOrUpdatePusherTask(Map<String, List<TargetKeyValue>> taskConfig) {
- Set<TargetKeyValue> taskProperty = new HashSet<>();
- for (String connectName : taskConfig.keySet()) {
- List<TargetKeyValue> targetKeyValues = taskConfig.get(connectName);
- taskProperty.addAll(new HashSet<>(targetKeyValues));
- }
- for (TargetKeyValue targetKeyValue : taskProperty) {
- try {
- String taskClass = targetKeyValue.getString(RuntimerConfigDefine.TASK_CLASS);
- ClassLoader loader = plugin.getPluginClassLoader(taskClass);
- Class taskClazz;
- boolean isolationFlag = false;
- if (loader instanceof PluginClassLoader) {
- taskClazz = ((PluginClassLoader) loader).loadClass(taskClass, false);
- isolationFlag = true;
- } else {
- taskClazz = Class.forName(taskClass);
- }
- SinkTask sinkTask = (SinkTask) taskClazz.getDeclaredConstructor().newInstance();
- sinkTask.init(targetKeyValue);
- PusherTaskContext sinkTaskContext = new PusherTaskContext(targetKeyValue);
- sinkTask.start(sinkTaskContext);
- pusherTasks.add(sinkTask);
- if (isolationFlag) {
- Plugin.compareAndSwapLoaders(loader);
- }
- logger.info("init target task succeed, target key - {}", JSON.toJSONString(targetKeyValue));
- } catch (Exception exception) {
- exception.printStackTrace();
- }
- }
+ public EventTargetPusher(CirculatorContext circulatorContext) {
+ this.circulatorContext = circulatorContext;
}
@Override
public void run() {
while (!stopped) {
-// Map<TargetKeyValue, ConnectRecord> taskPusher = listenerFactory.takeTargetMap();
-// if (MapUtils.isEmpty(taskPusher)) {
-// logger.info("current target pusher is empty");
-// this.waitForRunning(1000);
-// continue;
-// }
-// logger.info("start push content by pusher - {}", JSON.toJSONString(taskPusher));
-//
-// TargetKeyValue targetKeyValue = taskPusher.keySet().iterator().next();
-// // task-id for unique-key at ConnectKeyValue
-// // ConnectKeyValue -> new class for name
-// // also add in ConnectRecord class system property
-// String taskPushName = targetKeyValue.getString(RuntimerConfigDefine.TASK_CLASS);
-// // add thread pool
-// for (SinkTask sinkTask : pusherTasks) {
-// if (sinkTask.getClass().getName().equals(taskPushName)) {
-// sinkTask.put(Lists.newArrayList(taskPusher.get(targetKeyValue)));
-// }
-// }
+ ConnectRecord targetRecord = circulatorContext.takeTargetMap();
+ if (Objects.isNull(targetRecord)) {
+ logger.info("current target pusher is empty");
+ this.waitForRunning(1000);
+ continue;
+ }
+ if(logger.isDebugEnabled()){
+ logger.debug("start push content by pusher - {}", JSON.toJSONString(targetRecord));
+ }
+
+ Map<String, SinkTask> latestTaskMap = circulatorContext.getPusherTaskMap();
+ String runnerName = targetRecord.getExtensions().getString(RuntimerConfigDefine.RUNNER_NAME);
+ SinkTask sinkTask = latestTaskMap.get(runnerName);
+ // add thread pool
+ sinkTask.put(Lists.newArrayList(targetRecord));
}
}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/CirculatorContext.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/CirculatorContext.java
new file mode 100644
index 0000000..58e6b04
--- /dev/null
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/CirculatorContext.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener;
+
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.pusher.PusherTaskContext;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.transfer.TransformEngine;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.LoggerName;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.enums.RefreshTypeEnum;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.PluginClassLoader;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * event circulator context for transfer and pusher
+ */
+@Component
+public class CirculatorContext implements TargetRunnerListener {
+
+ private final static Logger logger = LoggerFactory.getLogger(LoggerName.EventBus_Listener);
+
+ private BlockingQueue<ConnectRecord> eventRecord = new LinkedBlockingQueue<>(50000);
+
+ private BlockingQueue<ConnectRecord> targetQueue = new LinkedBlockingQueue<>(50000);
+
+ private Map<String/*RunnerName*/, TransformEngine<ConnectRecord>> taskTransformMap = new ConcurrentHashMap<>(20);
+
+ private Map<String/*RunnerName*/, SinkTask> pusherTaskMap = new ConcurrentHashMap<>(20);
+
+ private Plugin plugin;
+
+ public CirculatorContext(Plugin plugin){
+ this.plugin = plugin;
+ }
+
+ /**
+ * initial targetRunnerMap, taskTransformMap, pusherTaskMap
+ * @param targetRunnerConfigs
+ */
+ public void initListenerMetadata(Set<TargetRunnerConfig> targetRunnerConfigs) {
+ if (CollectionUtils.isEmpty(targetRunnerConfigs)) {
+ return;
+ }
+ for (TargetRunnerConfig targetRunnerConfig : targetRunnerConfigs) {
+ onAddTargetRunner(targetRunnerConfig);
+ }
+ }
+
+ @Override
+ public void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+ refreshRunnerMetadata(targetRunnerConfig, RefreshTypeEnum.ADD);
+ }
+
+ @Override
+ public void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+ refreshRunnerMetadata(targetRunnerConfig, RefreshTypeEnum.UPDATE);
+ }
+
+ @Override
+ public void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+ refreshRunnerMetadata(targetRunnerConfig, RefreshTypeEnum.DELETE);
+ }
+
+ /**
+ * offer event records
+ *
+ * @param connectRecords
+ */
+ public boolean offerEventRecords(List<ConnectRecord> connectRecords) {
+ return eventRecord.addAll(connectRecords);
+ }
+
+ /**
+ * take event record
+ *
+ * @return
+ */
+ public ConnectRecord takeEventRecord() {
+ if (eventRecord.isEmpty()) {
+ return null;
+ }
+ try {
+ return eventRecord.take();
+ } catch (Exception exception) {
+ logger.error("take event record exception - stack-> ", exception);
+ }
+ return null;
+ }
+
+ public Map<String, TransformEngine<ConnectRecord>> getTaskTransformMap() {
+ return taskTransformMap;
+ }
+
+ public Map<String, SinkTask> getPusherTaskMap() {
+ return pusherTaskMap;
+ }
+
+ public boolean offerTargetTaskQueue(ConnectRecord connectRecord) {
+ return targetQueue.offer(connectRecord);
+ }
+
+ public ConnectRecord takeTargetMap() {
+ if (targetQueue.isEmpty()) {
+ return null;
+ }
+ try {
+ return targetQueue.take();
+ } catch (Exception exception) {
+ exception.printStackTrace();
+ }
+ return null;
+ }
+
+ /**
+ * refresh target runner where config changed
+ * @param targetRunnerConfig
+ * @param refreshTypeEnum
+ */
+ private void refreshRunnerMetadata(TargetRunnerConfig targetRunnerConfig, RefreshTypeEnum refreshTypeEnum) {
+ String runnerName = targetRunnerConfig.getName();
+ switch (refreshTypeEnum){
+ case ADD:
+ case UPDATE:
+ for(Map<String, String> configMap : targetRunnerConfig.getComponents()){
+ TargetKeyValue targetKeyValue = new TargetKeyValue(configMap);
+ TransformEngine<ConnectRecord> transformChain = new TransformEngine<>(targetKeyValue, plugin);
+ taskTransformMap.put(runnerName, transformChain);
+
+ SinkTask sinkTask = initTargetSinkTask(targetKeyValue);
+ pusherTaskMap.put(runnerName, sinkTask);
+ }
+ break;
+ case DELETE:
+ taskTransformMap.remove(runnerName);
+ pusherTaskMap.remove(runnerName);
+ break;
+ default:
+ break;
+ }
+ }
+
+ /**
+ * init target sink task
+ * @param targetKeyValue
+ * @return
+ */
+ private SinkTask initTargetSinkTask(TargetKeyValue targetKeyValue) {
+ String taskClass = targetKeyValue.getString(RuntimerConfigDefine.TASK_CLASS);
+ ClassLoader loader = plugin.getPluginClassLoader(taskClass);
+ Class taskClazz;
+ boolean isolationFlag = false;
+ try {
+ if (loader instanceof PluginClassLoader) {
+ taskClazz = ((PluginClassLoader) loader).loadClass(taskClass, false);
+ isolationFlag = true;
+ } else {
+ taskClazz = Class.forName(taskClass);
+ }
+ SinkTask sinkTask = (SinkTask) taskClazz.getDeclaredConstructor().newInstance();
+ sinkTask.init(targetKeyValue);
+ PusherTaskContext sinkTaskContext = new PusherTaskContext(targetKeyValue);
+ sinkTask.start(sinkTaskContext);
+ if (isolationFlag) {
+ Plugin.compareAndSwapLoaders(loader);
+ }
+ return sinkTask;
+ }catch (Exception e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/EventSubscriber.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/EventSubscriber.java
index 23c499b..f24ce08 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/EventSubscriber.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/EventSubscriber.java
@@ -18,12 +18,19 @@
package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener;
import io.openmessaging.connector.api.data.ConnectRecord;
-import java.util.List;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.enums.RefreshTypeEnum;
+
+import java.util.List;
public abstract class EventSubscriber implements TargetRunnerListener {
- abstract void refresh();
+ /**
+ * Refresh subscriber inner data when runner config changed
+ * @param targetRunnerConfig
+ * @param refreshTypeEnum
+ */
+ public abstract void refresh(TargetRunnerConfig targetRunnerConfig, RefreshTypeEnum refreshTypeEnum);
/**
* Pull connect records from store, Blocking method when is empty.
@@ -46,7 +53,7 @@ public abstract class EventSubscriber implements TargetRunnerListener {
*/
@Override
public void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig) {
- this.refresh();
+ this.refresh(targetRunnerConfig, RefreshTypeEnum.ADD);
}
/**
@@ -56,8 +63,7 @@ public abstract class EventSubscriber implements TargetRunnerListener {
*/
@Override
public void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig) {
-
- this.refresh();
+ this.refresh(targetRunnerConfig, RefreshTypeEnum.UPDATE);
}
/**
@@ -67,7 +73,7 @@ public abstract class EventSubscriber implements TargetRunnerListener {
*/
@Override
public void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig) {
- this.refresh();
+ this.refresh(targetRunnerConfig, RefreshTypeEnum.DELETE);
}
}
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java
deleted file mode 100644
index 2d22094..0000000
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener;
-
-import com.alibaba.fastjson.JSON;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import io.openmessaging.connector.api.component.task.sink.SinkTask;
-import io.openmessaging.connector.api.data.ConnectRecord;
-import io.openmessaging.connector.api.data.RecordOffset;
-import io.openmessaging.connector.api.data.RecordPartition;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.transfer.TransformEngine;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.LoggerName;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
-
-@Component
-public class ListenerFactory implements TargetRunnerListener {
-
- private final static Logger logger = LoggerFactory.getLogger(LoggerName.EventBus_Listener);
-
- private static final String SEMICOLON = ";";
-
- private static final String SYS_DEFAULT_GROUP = "default-%s-group";
-
- public static final String QUEUE_OFFSET = "queueOffset";
-
- private Plugin plugin;
-
- private BlockingQueue<TargetRunnerConfig> pusherTargetQueue = new LinkedBlockingQueue<>(1000);
-
- private BlockingQueue<MessageExt> eventMessage = new LinkedBlockingQueue(50000);
-
- private BlockingQueue<ConnectRecord> eventRecord = new LinkedBlockingQueue<>(50000);
-
- private BlockingQueue<ConnectRecord> targetQueue = new LinkedBlockingQueue<>(50000);
-
- private Map<String/*TargetRunnerName*/, TransformEngine<ConnectRecord>> taskTransformMap = new ConcurrentHashMap<>(20);
- private Map<String/*TargetRunnerName*/, SinkTask> pusherTaskMap = new ConcurrentHashMap<>(20);
- private Map<String/*TargetRunnerName*/, TargetRunnerConfig> targetRunnerConfigMap = new ConcurrentHashMap<>(20);
-
- @Value("${rocketmq.namesrvAddr:}")
- private String namesrvAddr;
-
- public DefaultLitePullConsumer initDefaultMQPullConsumer(String topic) {
- DefaultLitePullConsumer consumer = new DefaultLitePullConsumer();
- try {
- consumer.setConsumerGroup(String.format(SYS_DEFAULT_GROUP, topic));
- consumer.setNamesrvAddr(namesrvAddr);
- consumer.subscribe(topic, "*");
- consumer.start();
- } catch (Exception exception) {
- logger.error("init default pull consumer exception, topic -" + topic + "-stackTrace-", exception);
- }
- return consumer;
- }
-
- public TargetRunnerConfig takeTaskConfig() {
- if (pusherTargetQueue.isEmpty()) {
- return null;
- }
- try {
- return pusherTargetQueue.take();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return null;
- }
-
- /**
- * Offer listener event
- *
- * @param messageExt
- * @return
- */
- public boolean offerListenEvent(MessageExt messageExt) {
- return eventMessage.offer(messageExt);
- }
-
- public MessageExt takeListenerEvent() {
- if (eventMessage.isEmpty()) {
- return null;
- }
- try {
- return eventMessage.take();
- } catch (Exception exception) {
- exception.printStackTrace();
- }
- return null;
- }
-
- /**
- * offer event records
- *
- * @param connectRecords
- */
- public boolean offerEventRecords(List<ConnectRecord> connectRecords) {
- return eventRecord.addAll(connectRecords);
- }
-
- /**
- * take event record
- *
- * @return
- */
- public ConnectRecord takeEventRecord() {
- if (eventRecord.isEmpty()) {
- return null;
- }
- try {
- return eventRecord.take();
- } catch (Exception exception) {
- logger.error("take event record exception - stack-> ", exception);
- }
- return null;
- }
-
- public String createInstance(String servers) {
- String[] serversArray = servers.split(";");
- List<String> serversList = new ArrayList<String>();
- for (String server : serversArray) {
- if (!serversList.contains(server)) {
- serversList.add(server);
- }
- }
- Collections.sort(serversList);
- return String.valueOf(serversList.toString().hashCode());
- }
-
- /**
- * parse topic list by task config
- *
- * @param taskConfig
- * @return
- */
- public List<String> parseTopicList(TargetKeyValue taskConfig) {
- String messageQueueStr = taskConfig.getString(RuntimerConfigDefine.CONNECT_TOPICNAME);
- if (StringUtils.isBlank(messageQueueStr)) {
- return null;
- }
- List<String> topicList = Splitter.on(SEMICOLON).omitEmptyStrings().trimResults().splitToList(messageQueueStr);
- return Lists.newArrayList(new HashSet<>(topicList));
- }
-
- /**
- * parse topic list by task config list
- *
- * @param taskConfigs
- * @return
- */
- public List<String> parseTopicListByList(List<TargetKeyValue> taskConfigs) {
- Set<String> allTopicList = Sets.newHashSet();
- for (TargetKeyValue taskConfig : taskConfigs) {
- String messageQueueStr = taskConfig.getString(RuntimerConfigDefine.CONNECT_TOPICNAME);
- if (StringUtils.isBlank(messageQueueStr)) {
- continue;
- }
- List<String> topicList = Splitter.on(SEMICOLON).omitEmptyStrings().trimResults().splitToList(messageQueueStr);
- allTopicList.addAll(topicList);
- }
- return Lists.newArrayList(allTopicList);
- }
-
- /**
- * parse msg queue by queue json
- *
- * @param messageQueueStr
- * @return
- */
- public MessageQueue parseMessageQueueList(String messageQueueStr) {
- List<String> messageQueueStrList = Splitter.on(SEMICOLON).omitEmptyStrings().trimResults().splitToList(messageQueueStr);
- if (CollectionUtils.isEmpty(messageQueueStrList) || messageQueueStrList.size() != 3) {
- return null;
- }
- return new MessageQueue(messageQueueStrList.get(0), messageQueueStrList.get(1), Integer.valueOf(messageQueueStrList.get(2)));
- }
-
- public RecordPartition convertToRecordPartition(String topic, String brokerName, int queueId) {
- Map<String, String> map = new HashMap<>();
- map.put("topic", topic);
- map.put("brokerName", brokerName);
- map.put("queueId", queueId + "");
- RecordPartition recordPartition = new RecordPartition(map);
- return recordPartition;
- }
-
- public RecordOffset convertToRecordOffset(Long offset) {
- Map<String, String> offsetMap = new HashMap<>();
- offsetMap.put(QUEUE_OFFSET, offset + "");
- RecordOffset recordOffset = new RecordOffset(offsetMap);
- return recordOffset;
- }
-
- public boolean offerTargetTaskQueue(ConnectRecord connectRecord) {
- return targetQueue.offer(connectRecord);
- }
-
- public ConnectRecord takeTargetMap() {
- if (targetQueue.isEmpty()) {
- return null;
- }
- try {
- return targetQueue.take();
- } catch (Exception exception) {
- exception.printStackTrace();
- }
- return null;
- }
-
- /**
- * Init sink task transform map key: task config value: transformEngine
- *
- * @param taskConfig
- * @return
- */
- private Map<TargetKeyValue, TransformEngine<ConnectRecord>> initSinkTaskTransformInfo(
- Map<String, List<TargetKeyValue>> taskConfig) {
- Map<TargetKeyValue, TransformEngine<ConnectRecord>> curTaskTransformMap = new HashMap<>();
- Set<TargetKeyValue> allTaskKeySet = new HashSet<>();
- for (String connectName : taskConfig.keySet()) {
- List<TargetKeyValue> taskKeyList = taskConfig.get(connectName);
- allTaskKeySet.addAll(new HashSet<>(taskKeyList));
- }
- for (TargetKeyValue keyValue : allTaskKeySet) {
- TransformEngine<ConnectRecord> transformChain = new TransformEngine<>(keyValue, plugin);
- curTaskTransformMap.put(keyValue, transformChain);
- }
- logger.info("init sink task transform info succeed, transform map - {}", JSON.toJSONString(curTaskTransformMap));
- return curTaskTransformMap;
- }
-
-
- @Override public void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig) {
- // refresh taskTransformMap & pusherTaskMap & targetRunnerConfigMap
- }
-
- @Override public void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig) {
-
- }
-
- @Override public void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig) {
-
- }
-
- public Map<String, TransformEngine<ConnectRecord>> getTaskTransformMap() {
- return taskTransformMap;
- }
-
- public Map<String, SinkTask> getPusherTaskMap() {
- return pusherTaskMap;
- }
-
- public Map<String, TargetRunnerConfig> getTargetRunnerConfigMap() {
- return targetRunnerConfigMap;
- }
-
-}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
index 7c8e5a3..4ca0507 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
@@ -14,30 +14,276 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener;
+import com.alibaba.fastjson.JSON;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.data.ConnectRecord;
-import java.util.List;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.enums.RefreshTypeEnum;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigObserver;
+import org.apache.rocketmq.eventbridge.exception.EventBridgeException;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.io.support.PropertiesLoaderUtils;
+import org.springframework.util.CollectionUtils;
+
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.stream.Collectors;
+/**
+ * RocketMQ implement event subscriber
+ */
public class RocketMQEventSubscriber extends EventSubscriber {
- private ListenerFactory listenerFactory;
+ private static final Logger logger = LoggerFactory.getLogger(RocketMQEventSubscriber.class);
- public RocketMQEventSubscriber(
- ListenerFactory listenerFactory) {
- this.listenerFactory = listenerFactory;
- }
+ private DefaultLitePullConsumer pullConsumer;
+
+ private TargetRunnerConfigObserver runnerConfigObserver;
+
+ private Integer pullTimeOut;
+
+ private String namesrvAddr;
+
+ private static final String SEMICOLON = ";";
+
+ private static final String SYS_DEFAULT_GROUP = "event-bridge-default-group";
+
+ public static final String QUEUE_OFFSET = "queueOffset";
- @Override void refresh() {
+ public RocketMQEventSubscriber(TargetRunnerConfigObserver runnerConfigObserver) {
+ this.runnerConfigObserver = runnerConfigObserver;
+ this.initMqProperties();
+ this.initPullConsumer();
+ }
+ @Override
+ public void refresh(TargetRunnerConfig targetRunnerConfig, RefreshTypeEnum refreshTypeEnum) {
+ if(Objects.isNull(pullConsumer)){
+ pullConsumer = initDefaultMQPullConsumer();
+ return;
+ }
+ Set<String> currentTopics = parseTopicsByRunnerConfigs(Sets.newHashSet(targetRunnerConfig));
+ for (String topic : currentTopics){
+ switch (refreshTypeEnum){
+ case ADD:
+ case UPDATE:
+ subscribe(topic);
+ break;
+ case DELETE:
+ unSubscribe(topic);
+ break;
+ default:
+ break;
+ }
+ }
}
- @Override public List<ConnectRecord> pull() {
- return null;
+ @Override
+ public List<ConnectRecord> pull() {
+ List<MessageExt> messageExts = pullConsumer.poll(pullTimeOut);
+ if (CollectionUtils.isEmpty(messageExts)) {
+ logger.info("consumer poll message empty , consumer - {}", JSON.toJSONString(pullConsumer));
+ return null;
+ }
+ List<ConnectRecord> connectRecords = Lists.newArrayList();
+ for (MessageExt messageExt : messageExts) {
+ ConnectRecord eventRecord = convertToSinkRecord(messageExt);
+ connectRecords.add(eventRecord);
+ if(logger.isDebugEnabled()){
+ logger.debug("offer listen event record - {} - by message event- {}", eventRecord, messageExt);
+ }
+ }
+ return connectRecords;
}
@Override
public void commit(List<ConnectRecord> connectRecordList) {
}
+
+ /**
+ * parse topics by specific target runner configs
+ * @param targetRunnerConfigs
+ * @return
+ */
+ public Set<String> parseTopicsByRunnerConfigs(Set<TargetRunnerConfig> targetRunnerConfigs){
+ if(org.apache.commons.collections.CollectionUtils.isEmpty(targetRunnerConfigs)){
+ logger.warn("target runner config is empty, parse to topic failed!");
+ return null;
+ }
+ Set<String> listenTopics = Sets.newHashSet();
+ for(TargetRunnerConfig runnerConfig : targetRunnerConfigs){
+ List<Map<String,String>> runnerConfigMap = runnerConfig.getComponents();
+ if(org.apache.commons.collections.CollectionUtils.isEmpty(runnerConfigMap)){
+ continue;
+ }
+ listenTopics.addAll(runnerConfigMap.stream().map(item->item.get(RuntimerConfigDefine.CONNECT_TOPICNAME)).collect(Collectors.toSet()));
+ }
+ return listenTopics;
+ }
+
+ /**
+ * init rocketmq ref config
+ */
+ private void initMqProperties() {
+ try {
+ Properties properties = PropertiesLoaderUtils.loadAllProperties("runtimer.properties");
+ namesrvAddr = properties.getProperty("rocketmq.namesrvAddr");
+ pullTimeOut = Integer.valueOf(properties.getProperty("rocketmq.consumer.pullTimeOut"));
+ }catch (Exception exception){
+
+ }
+
+ }
+
+ /**
+ * init rocket mq pull consumer
+ */
+ private void initPullConsumer() {
+ pullConsumer = initDefaultMQPullConsumer();
+ }
+
+ /**
+ * first init default rocketmq pull consumer
+ * @return
+ */
+ public DefaultLitePullConsumer initDefaultMQPullConsumer () {
+ Set<TargetRunnerConfig> targetRunnerConfigs = runnerConfigObserver.getTargetRunnerConfig();
+ Set<String> topics = parseTopicsByRunnerConfigs(targetRunnerConfigs);
+ DefaultLitePullConsumer consumer = new DefaultLitePullConsumer();
+ consumer.setConsumerGroup(createGroupName(SYS_DEFAULT_GROUP));
+ consumer.setNamesrvAddr(namesrvAddr);
+ try {
+ for(String topic : topics){
+ consumer.subscribe(topic, "*");
+ }
+ consumer.start();
+ } catch (Exception exception) {
+ logger.error("init default pull consumer exception, topic -" + topics.toString() + "-stackTrace-", exception);
+ throw new EventBridgeException(" init rocketmq consumer failed");
+ }
+ return consumer;
+ }
+
+ private String createGroupName(String prefix) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(prefix).append("-");
+ sb.append(RemotingUtil.getLocalAddress()).append("-");
+ sb.append(UtilAll.getPid()).append("-");
+ sb.append(System.nanoTime());
+ return sb.toString().replace(".", "-");
+ }
+
+ private String createInstance(String servers) {
+ String[] serversArray = servers.split(";");
+ List<String> serversList = new ArrayList<String>();
+ for (String server : serversArray) {
+ if (!serversList.contains(server)) {
+ serversList.add(server);
+ }
+ }
+ Collections.sort(serversList);
+ return String.valueOf(serversList.toString().hashCode());
+ }
+
+ /**
+ * parse msg queue by queue json
+ *
+ * @param messageQueueStr
+ * @return
+ */
+ private MessageQueue parseMessageQueueList(String messageQueueStr) {
+ List<String> messageQueueStrList = Splitter.on(SEMICOLON).omitEmptyStrings().trimResults().splitToList(messageQueueStr);
+ if (org.apache.commons.collections.CollectionUtils.isEmpty(messageQueueStrList) || messageQueueStrList.size() != 3) {
+ return null;
+ }
+ return new MessageQueue(messageQueueStrList.get(0), messageQueueStrList.get(1), Integer.valueOf(messageQueueStrList.get(2)));
+ }
+
+ /**
+ * MessageExt convert to connect record
+ * @param messageExt
+ * @return
+ */
+ private ConnectRecord convertToSinkRecord(MessageExt messageExt) {
+ Map<String, String> properties = messageExt.getProperties();
+ Schema schema;
+ Long timestamp;
+ ConnectRecord sinkRecord;
+ String connectTimestamp = properties.get(RuntimerConfigDefine.CONNECT_TIMESTAMP);
+ timestamp = StringUtils.isNotEmpty(connectTimestamp) ? Long.valueOf(connectTimestamp) : null;
+ String connectSchema = properties.get(RuntimerConfigDefine.CONNECT_SCHEMA);
+ schema = StringUtils.isNotEmpty(connectSchema) ? JSON.parseObject(connectSchema, Schema.class) : null;
+ byte[] body = messageExt.getBody();
+ RecordPartition recordPartition = convertToRecordPartition(messageExt.getTopic(), messageExt.getBrokerName(), messageExt.getQueueId());
+ RecordOffset recordOffset = convertToRecordOffset(messageExt.getQueueOffset());
+ String bodyStr = new String(body, StandardCharsets.UTF_8);
+ sinkRecord = new ConnectRecord(recordPartition, recordOffset, timestamp, schema, bodyStr);
+ KeyValue keyValue = new DefaultKeyValue();
+ keyValue.put(RuntimerConfigDefine.CONNECT_TOPICNAME, messageExt.getTopic());
+ if (MapUtils.isNotEmpty(properties)) {
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ keyValue.put(entry.getKey(), entry.getValue());
+ }
+ }
+ sinkRecord.addExtension(keyValue);
+ return sinkRecord;
+ }
+
+ private RecordPartition convertToRecordPartition(String topic, String brokerName, int queueId) {
+ Map<String, String> map = new HashMap<>();
+ map.put("topic", topic);
+ map.put("brokerName", brokerName);
+ map.put("queueId", queueId + "");
+ RecordPartition recordPartition = new RecordPartition(map);
+ return recordPartition;
+ }
+
+ private RecordOffset convertToRecordOffset(Long offset) {
+ Map<String, String> offsetMap = new HashMap<>();
+ offsetMap.put(QUEUE_OFFSET, offset + "");
+ RecordOffset recordOffset = new RecordOffset(offsetMap);
+ return recordOffset;
+ }
+
+ /**
+ * new topic for subscribe
+ * @param topic
+ */
+ private void subscribe(String topic) {
+ try {
+ pullConsumer.subscribe(topic, "*");
+ } catch (MQClientException exception) {
+ logger.error("rocketmq event subscribe new topic failed, stack trace - ", exception);
+ }
+ }
+
+ /**
+ * unsubscribe old topic
+ * @param topic
+ */
+ private void unSubscribe(String topic) {
+ pullConsumer.unsubscribe(topic);
+ }
+
}
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/TargetRunnerListener.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/TargetRunnerListener.java
index f528b7b..8284e8a 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/TargetRunnerListener.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/TargetRunnerListener.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetKeyValue.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetKeyValue.java
index c3c6a07..5d52128 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetKeyValue.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetKeyValue.java
@@ -18,11 +18,10 @@
package org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity;
import io.openmessaging.KeyValue;
-import org.springframework.util.IdGenerator;
-import org.springframework.util.SimpleIdGenerator;
import java.io.Serializable;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@@ -47,6 +46,11 @@ public class TargetKeyValue implements KeyValue, Serializable {
targetKeyId = UUID.randomUUID().toString();
}
+ public TargetKeyValue(Map<String, String> targetMap) {
+ properties = new ConcurrentHashMap<>(targetMap);
+ targetKeyId = UUID.randomUUID().toString();
+ }
+
@Override
public KeyValue put(String key, int value) {
properties.put(key, String.valueOf(value));
@@ -136,24 +140,24 @@ public class TargetKeyValue implements KeyValue, Serializable {
}
@Override
- public boolean equals(Object obj) {
-
- if (obj != null && obj.getClass() == this.getClass()) {
- TargetKeyValue keyValue = (TargetKeyValue) obj;
- return this.properties.equals(keyValue.getProperties());
- }
- return false;
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ TargetKeyValue that = (TargetKeyValue) o;
+ return Objects.equals(targetKeyId, that.targetKeyId) && Objects.equals(properties, that.properties);
}
@Override
public int hashCode() {
- return properties.hashCode();
+ return Objects.hash(targetKeyId, properties);
}
- @Override public String toString() {
- return "ConnectKeyValue{" +
- "properties=" + properties +
- '}';
+ @Override
+ public String toString() {
+ return "TargetKeyValue{" +
+ "targetKeyId='" + targetKeyId + '\'' +
+ ", properties=" + properties +
+ '}';
}
public String getTargetKeyId() {
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/enums/RefreshTypeEnum.java
similarity index 57%
copy from adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
copy to adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/enums/RefreshTypeEnum.java
index 7c8e5a3..9e8f6cd 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/enums/RefreshTypeEnum.java
@@ -14,30 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener;
-import io.openmessaging.connector.api.data.ConnectRecord;
-import java.util.List;
+package org.apache.rocketmq.eventbridge.adapter.runtimer.common.enums;
-public class RocketMQEventSubscriber extends EventSubscriber {
-
- private ListenerFactory listenerFactory;
-
- public RocketMQEventSubscriber(
- ListenerFactory listenerFactory) {
- this.listenerFactory = listenerFactory;
- }
-
- @Override void refresh() {
-
- }
-
- @Override public List<ConnectRecord> pull() {
- return null;
- }
-
- @Override
- public void commit(List<ConnectRecord> connectRecordList) {
-
- }
-}
\ No newline at end of file
+/**
+ * runner refresh type
+ */
+public enum RefreshTypeEnum {
+ ADD,
+ UPDATE,
+ DELETE
+}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java
index 500f300..04895cb 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java
@@ -37,6 +37,8 @@ public class RuntimerConfigDefine {
public static final String TASK_CLASS = "task-class";
+ public static final String RUNNER_NAME = "runner-name";
+
public static final String TASK_ID = "task-id";
public static final String TASK_TYPE = "task-type";
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
index b963c72..bbb69b3 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
@@ -37,15 +37,20 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
import org.apache.rocketmq.eventbridge.exception.EventBridgeException;
+import org.springframework.stereotype.Service;
@Slf4j
public class TargetRunnerConfigOnFileObserver extends AbstractTargetRunnerConfigObserver {
private String pathName;
+ public static final String DEFAULT_TARGET_RUNNER_CONFIG_FILE_NAME = "target-runner.json";
+
+
private static ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(
ThreadUtils.newThreadFactory("TargetRunnerConfigOnFileObserver", false));
@@ -57,6 +62,9 @@ public class TargetRunnerConfigOnFileObserver extends AbstractTargetRunnerConfig
public TargetRunnerConfigOnFileObserver() {
super.getTargetRunnerConfig().addAll(getLatestTargetRunnerConfig());
+ if(StringUtils.isEmpty(pathName)){
+ this.pathName = getConfigFilePath();
+ }
this.addListen(pathName, this);
}
@@ -66,8 +74,7 @@ public class TargetRunnerConfigOnFileObserver extends AbstractTargetRunnerConfig
try {
File file = new File(pathName);
config = FileUtils.readFileToString(file, "UTF-8");
- Type workerConfigType = new TypeToken<HashSet<TargetRunnerConfig>>() {
- }.getType();
+ Type workerConfigType = new TypeToken<HashSet<TargetRunnerConfig>>() {}.getType();
Set<TargetRunnerConfig> taskConfigList = new Gson().fromJson(config, workerConfigType);
return taskConfigList;
} catch (IOException e) {
@@ -78,16 +85,14 @@ public class TargetRunnerConfigOnFileObserver extends AbstractTargetRunnerConfig
}
}
- public void addListen(String pathName,
- TargetRunnerConfigOnFileObserver pusherConfigOnFileService) {
+ public void addListen(String pathName, TargetRunnerConfigOnFileObserver pusherConfigOnFileService) {
log.info("Watching task file changing:{}", pathName);
int index = pathName.lastIndexOf("/");
String filePath = pathName.substring(0, index);
String fileName = pathName.substring(index + 1);
service.scheduleAtFixedRate(() -> {
try {
- WatchService watchService = FileSystems.getDefault()
- .newWatchService();
+ WatchService watchService = FileSystems.getDefault().newWatchService();
Path path = Paths.get(filePath);
path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE,
StandardWatchEventKinds.ENTRY_MODIFY);
@@ -136,4 +141,8 @@ public class TargetRunnerConfigOnFileObserver extends AbstractTargetRunnerConfig
return map;
}
+ private String getConfigFilePath() {
+ return this.getClass().getClassLoader().getResource(DEFAULT_TARGET_RUNNER_CONFIG_FILE_NAME).getPath();
+ }
+
}
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/resources/runtimer.properties b/adapter/runtimer/src/main/resources/runtimer.properties
new file mode 100644
index 0000000..b5659c1
--- /dev/null
+++ b/adapter/runtimer/src/main/resources/runtimer.properties
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+## rocketmq
+rocketmq.namesrvAddr=localhost:9876
+rocketmq.consumer.pullTimeOut = 3000
+rocketmq.cluster.name=DefaultCluster
+## runtimer
+rumtimer.name=eventbridge-runtimer
+runtimer.pluginpath=/Users/Local/eventbridge/plugin
+runtimer.storePathRootDir=/Users/Local/eventbridge/store
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 95bc7de..7e319aa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -89,6 +89,7 @@
<cloudevents.version>2.3.0</cloudevents.version>
<apache.commons-text.version>1.10.0</apache.commons-text.version>
<mockito.version>2.13.0</mockito.version>
+ <jupiter.version>5.9.2</jupiter.version>
<caffeine.version>2.9.3</caffeine.version>
<fastjson.version>1.2.76</fastjson.version>
<jacoco-maven-plugin.version>0.8.5</jacoco-maven-plugin.version>