You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/04/01 07:38:04 UTC
[rocketmq-connect] branch master updated: [ISSUE #16] [Part 4] Support matrix (#17)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new f887523 [ISSUE #16] [Part 4] Support matrix (#17)
f887523 is described below
commit f887523d5412c51dad033ea0f54205701b72a8fc
Author: zhoubo <87...@qq.com>
AuthorDate: Fri Apr 1 15:37:59 2022 +0800
[ISSUE #16] [Part 4] Support matrix (#17)
* fix load connector api repeat
* reblance add try cache
* supprot matrix
* source connect put extension to msg property
* extendion key add MQ-SYS- && connect-ext- prefix
* fix file connect bug and add demo transform
* fix matrix null pointer
* connect ,task start stop add try cache
* fix transform config lost bug
* fix transform config lost bug
* transform remove prefix
* fix load connector api repeat
* transform remove prefix
* fix transform validate config error
* org.slf4j load by runtime
* remove transform class key config
* remove transformchain main
* add topic config volatile, producer send add topic log
---
.../connect/runtime/ConnectController.java | 24 +-
.../connect/runtime/common/LoggerName.java | 3 +
.../runtime/config/RuntimeConfigDefine.java | 1 +
.../runtime/connectorwrapper/TransformChain.java | 10 +-
.../connect/runtime/connectorwrapper/Worker.java | 215 +++++++------
.../runtime/connectorwrapper/WorkerSinkTask.java | 102 ++++--
.../runtime/connectorwrapper/WorkerSourceTask.java | 83 +++--
.../rocketmq/connect/runtime/rest/RestHandler.java | 10 +-
.../service/ConfigManagementServiceImpl.java | 6 +
.../runtime/service/TaskPositionCommitService.java | 9 +-
.../connect/runtime/stats/ConnectStats.java | 80 +++++
.../connect/runtime/stats/ConnectStatsManager.java | 284 +++++++++++++++++
.../connect/runtime/stats/ConnectStatsService.java | 348 +++++++++++++++++++++
.../connect/runtime/utils/ConnectUtil.java | 27 ++
.../connect/runtime/utils/PluginUtils.java | 1 +
.../src/main/resources/logback.xml | 18 ++
.../runtime/connectorwrapper/WorkerTest.java | 15 +-
.../connect/runtime/rest/RestHandlerTest.java | 12 +-
.../apache/rocketmq/connect/file/FileSinkTask.java | 18 +-
.../rocketmq/connect/file/FileSourceTask.java | 3 +-
.../rocketmq/connect/file/FilterTransform.java | 56 ++++
21 files changed, 1131 insertions(+), 194 deletions(-)
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectController.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectController.java
index 71c8fda..3971656 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectController.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectController.java
@@ -37,6 +37,8 @@ import org.apache.rocketmq.connect.runtime.service.PositionManagementServiceImpl
import org.apache.rocketmq.connect.runtime.service.RebalanceImpl;
import org.apache.rocketmq.connect.runtime.service.RebalanceService;
import org.apache.rocketmq.connect.runtime.service.strategy.AllocateConnAndTaskStrategy;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
import org.apache.rocketmq.connect.runtime.utils.Plugin;
import org.slf4j.Logger;
@@ -101,6 +103,10 @@ public class ConnectController {
private final Plugin plugin;
+ private ConnectStatsManager connectStatsManager;
+
+ private final ConnectStatsService connectStatsService;
+
public ConnectController(
ConnectConfig connectConfig) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
@@ -115,13 +121,14 @@ public class ConnectController {
}
plugin = new Plugin(pluginPaths);
plugin.initPlugin();
-
+ this.connectStatsManager = new ConnectStatsManager(connectConfig);
+ this.connectStatsService = new ConnectStatsService();
this.connectConfig = connectConfig;
this.clusterManagementService = new ClusterManagementServiceImpl(connectConfig);
this.configManagementService = new ConfigManagementServiceImpl(connectConfig, plugin);
this.positionManagementService = new PositionManagementServiceImpl(connectConfig);
this.offsetManagementService = new OffsetManagementServiceImpl(connectConfig);
- this.worker = new Worker(connectConfig, positionManagementService, offsetManagementService, plugin);
+ this.worker = new Worker(connectConfig, positionManagementService, configManagementService, plugin, this);
AllocateConnAndTaskStrategy strategy = ConnectUtil.initAllocateConnAndTaskStrategy(connectConfig);
this.rebalanceImpl = new RebalanceImpl(worker, configManagementService, clusterManagementService, strategy, this);
this.restHandler = new RestHandler(this);
@@ -140,6 +147,7 @@ public class ConnectController {
offsetManagementService.start();
worker.start();
rebalanceService.start();
+ connectStatsService.start();
// Persist configurations of current connectors and tasks.
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
@@ -233,4 +241,16 @@ public class ConnectController {
public RebalanceImpl getRebalanceImpl() {
return rebalanceImpl;
}
+
+ public ConnectStatsManager getConnectStatsManager() {
+ return connectStatsManager;
+ }
+
+ public void setConnectStatsManager(ConnectStatsManager connectStatsManager) {
+ this.connectStatsManager = connectStatsManager;
+ }
+
+ public ConnectStatsService getConnectStatsService() {
+ return connectStatsService;
+ }
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/LoggerName.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/LoggerName.java
index b01dc36..2b62956 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/LoggerName.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/LoggerName.java
@@ -22,4 +22,7 @@ package org.apache.rocketmq.connect.runtime.common;
*/
public class LoggerName {
public static final String ROCKETMQ_RUNTIME = "RocketMQRuntime";
+ public static final String BROKER_BASE_LOG = "BrokerBaseLog";
+ public static final String COMMON_LOGGER_NAME = "ConnectCommon";
+ public static final String ROCKETMQ_CONNECT_STATS = "RocketmqConnectStats";
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/RuntimeConfigDefine.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/RuntimeConfigDefine.java
index 7885475..d7a13e2 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/RuntimeConfigDefine.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/RuntimeConfigDefine.java
@@ -99,6 +99,7 @@ public class RuntimeConfigDefine {
public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
{
add(CONNECTOR_CLASS);
+ add(CONNECT_TOPICNAME);
}
};
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
index ae89572..3de0222 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
@@ -68,17 +68,19 @@ public class TransformChain<R extends ConnectRecord> {
return;
}
transformList.stream().forEach(transformStr -> {
- String transformClass = config.getString(PREFIX + transformStr + "-class");
+ String transformClassKey = PREFIX + transformStr + "-class";
+ String transformClass = config.getString(transformClassKey);
try {
Transform transform = getTransform(transformClass);
- transform.validate(config);
KeyValue transformConfig = new DefaultKeyValue();
Set<String> configKeys = config.keySet();
for (String key : configKeys) {
- if (key.startsWith(PREFIX + transformStr)) {
- transformConfig.put(key, config.getString(key));
+ if (key.startsWith(PREFIX + transformStr) && !key.equals(transformClassKey)) {
+ String originKey = key.replace(PREFIX + transformStr + "-", "");
+ transformConfig.put(originKey, config.getString(key));
}
}
+ transform.validate(transformConfig);
transform.init(transformConfig);
this.transformList.add(transform);
} catch (Exception e) {
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
index b899509..e22f969 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.connect.runtime.connectorwrapper;
+import com.alibaba.fastjson.JSON;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.internal.ConcurrentSet;
import io.openmessaging.connector.api.component.connector.Connector;
@@ -48,9 +49,12 @@ import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
+import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
import org.apache.rocketmq.connect.runtime.service.DefaultConnectorContext;
import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
import org.apache.rocketmq.connect.runtime.service.TaskPositionCommitService;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
import org.apache.rocketmq.connect.runtime.utils.Plugin;
import org.apache.rocketmq.connect.runtime.utils.PluginClassLoader;
@@ -86,15 +90,12 @@ public class Worker {
private Set<Runnable> cleanedStoppedTasks = new ConcurrentSet<>();
-
Map<String, List<ConnectKeyValue>> latestTaskConfigs = new HashMap<>();
/**
* Current running tasks to its Future map.
*/
private Map<Runnable, Future> taskToFutureMap = new ConcurrentHashMap<>();
-
-
/**
* Thread pool for connectors and tasks.
*/
@@ -106,11 +107,6 @@ public class Worker {
private final PositionManagementService positionManagementService;
/**
- * Offset management for source tasks.
- */
- private final PositionManagementService offsetManagementService;
-
- /**
* A scheduled task to commit source position of source tasks.
*/
private final TaskPositionCommitService taskPositionCommitService;
@@ -119,30 +115,33 @@ public class Worker {
private final Plugin plugin;
- private static final int MAX_START_TIMEOUT_MILLS = 5000;
+ private static final int MAX_START_TIMEOUT_MILLS = 5000;
- private static final long MAX_STOP_TIMEOUT_MILLS = 20000;
+ private static final long MAX_STOP_TIMEOUT_MILLS = 20000;
/**
* Atomic state variable
*/
private AtomicReference<WorkerState> workerState;
-
private StateMachineService stateMachineService = new StateMachineService();
+ private final ConnectStatsManager connectStatsManager;
+
+ private final ConnectStatsService connectStatsService;
+
public Worker(ConnectConfig connectConfig,
- PositionManagementService positionManagementService, PositionManagementService offsetManagementService,
- Plugin plugin) {
+ PositionManagementService positionManagementService, ConfigManagementService configManagementService,
+ Plugin plugin, ConnectController connectController) {
this.connectConfig = connectConfig;
this.taskExecutor = Executors.newCachedThreadPool(new DefaultThreadFactory("task-Worker-Executor-"));
this.positionManagementService = positionManagementService;
- this.offsetManagementService = offsetManagementService;
this.taskPositionCommitService = new TaskPositionCommitService(
this,
- positionManagementService,
- offsetManagementService);
+ positionManagementService);
this.plugin = plugin;
+ this.connectStatsManager = connectController.getConnectStatsManager();
+ this.connectStatsService = connectController.getConnectStatsService();
}
public void start() {
@@ -160,17 +159,21 @@ public class Worker {
* @throws Exception
*/
public synchronized void startConnectors(Map<String, ConnectKeyValue> connectorConfigs,
- ConnectController connectController) throws Exception {
+ ConnectController connectController) throws Exception {
Set<WorkerConnector> stoppedConnector = new HashSet<>();
for (WorkerConnector workerConnector : workingConnectors) {
- String connectorName = workerConnector.getConnectorName();
- ConnectKeyValue keyValue = connectorConfigs.get(connectorName);
- if (null == keyValue || 0 != keyValue.getInt(RuntimeConfigDefine.CONFIG_DELETED)) {
- workerConnector.stop();
- log.info("Connector {} stop", workerConnector.getConnectorName());
- stoppedConnector.add(workerConnector);
- } else if (!keyValue.equals(workerConnector.getKeyValue())) {
- workerConnector.reconfigure(keyValue);
+ try {
+ String connectorName = workerConnector.getConnectorName();
+ ConnectKeyValue keyValue = connectorConfigs.get(connectorName);
+ if (null == keyValue || 0 != keyValue.getInt(RuntimeConfigDefine.CONFIG_DELETED)) {
+ workerConnector.stop();
+ log.info("Connector {} stop", workerConnector.getConnectorName());
+ stoppedConnector.add(workerConnector);
+ } else if (!keyValue.equals(workerConnector.getKeyValue())) {
+ workerConnector.reconfigure(keyValue);
+ }
+ } catch (Exception e) {
+ log.error("stop or reconfigure connector error, connectName: " + workerConnector.getConnectorName(), e);
}
}
workingConnectors.removeAll(stoppedConnector);
@@ -193,28 +196,32 @@ public class Worker {
}
for (String connectorName : newConnectors.keySet()) {
- ConnectKeyValue keyValue = newConnectors.get(connectorName);
- String connectorClass = keyValue.getString(RuntimeConfigDefine.CONNECTOR_CLASS);
- ClassLoader loader = plugin.getPluginClassLoader(connectorClass);
- final ClassLoader currentThreadLoader = plugin.currentThreadLoader();
- Class clazz;
- boolean isolationFlag = false;
- if (loader instanceof PluginClassLoader) {
- clazz = ((PluginClassLoader) loader).loadClass(connectorClass, false);
- isolationFlag = true;
- } else {
- clazz = Class.forName(connectorClass);
- }
- final Connector connector = (Connector) clazz.getDeclaredConstructor().newInstance();
- WorkerConnector workerConnector = new WorkerConnector(connectorName, connector, connectorConfigs.get(connectorName), new DefaultConnectorContext(connectorName, connectController));
- if (isolationFlag) {
- Plugin.compareAndSwapLoaders(loader);
+ try {
+ ConnectKeyValue keyValue = newConnectors.get(connectorName);
+ String connectorClass = keyValue.getString(RuntimeConfigDefine.CONNECTOR_CLASS);
+ ClassLoader loader = plugin.getPluginClassLoader(connectorClass);
+ final ClassLoader currentThreadLoader = plugin.currentThreadLoader();
+ Class clazz;
+ boolean isolationFlag = false;
+ if (loader instanceof PluginClassLoader) {
+ clazz = ((PluginClassLoader) loader).loadClass(connectorClass, false);
+ isolationFlag = true;
+ } else {
+ clazz = Class.forName(connectorClass);
+ }
+ final Connector connector = (Connector) clazz.getDeclaredConstructor().newInstance();
+ WorkerConnector workerConnector = new WorkerConnector(connectorName, connector, connectorConfigs.get(connectorName), new DefaultConnectorContext(connectorName, connectController));
+ if (isolationFlag) {
+ Plugin.compareAndSwapLoaders(loader);
+ }
+ workerConnector.initialize();
+ workerConnector.start();
+ log.info("Connector {} start", workerConnector.getConnectorName());
+ Plugin.compareAndSwapLoaders(currentThreadLoader);
+ this.workingConnectors.add(workerConnector);
+ } catch (Exception e) {
+ log.error("worker connector start exception. workerName: " + connectorName, e);
}
- workerConnector.initialize();
- workerConnector.start();
- log.info("Connector {} start", workerConnector.getConnectorName());
- Plugin.compareAndSwapLoaders(currentThreadLoader);
- this.workingConnectors.add(workerConnector);
}
}
@@ -231,7 +238,6 @@ public class Worker {
}
}
-
private boolean isConfigInSet(ConnectKeyValue keyValue, Set<Runnable> set) {
for (Runnable runnable : set) {
ConnectKeyValue taskConfig = null;
@@ -268,15 +274,15 @@ public class Worker {
}
public void setWorkingConnectors(
- Set<WorkerConnector> workingConnectors) {
+ Set<WorkerConnector> workingConnectors) {
this.workingConnectors = workingConnectors;
}
-
/**
* Beaware that we are not creating a defensive copy of these tasks
* So developers should only use these references for read-only purposes.
* These variables should be immutable
+ *
* @return
*/
public Set<Runnable> getWorkingTasks() {
@@ -311,7 +317,6 @@ public class Worker {
this.runningTasks = workingTasks;
}
-
public void maintainConnectorState() {
}
@@ -332,7 +337,6 @@ public class Worker {
List<ConnectKeyValue> keyValues = taskConfigs.get(connectorName);
WorkerTaskState state = ((WorkerTask) runnable).getState();
-
if (WorkerTaskState.ERROR == state) {
errorTasks.add(runnable);
runningTasks.remove(runnable);
@@ -347,10 +351,12 @@ public class Worker {
}
}
-
if (needStop) {
- workerTask.stop();
-
+ try {
+ workerTask.stop();
+ } catch (Exception e) {
+ log.error("workerTask stop exception, workerTask: " + workerTask.getTaskConfig(), e);
+ }
log.info("Task stopping, connector name {}, config {}", workerTask.getConnectorName(), workerTask.getTaskConfig());
runningTasks.remove(runnable);
stoppingTasks.put(runnable, System.currentTimeMillis());
@@ -393,55 +399,58 @@ public class Worker {
createDirectTask(connectorName, keyValue);
continue;
}
-
- String taskClass = keyValue.getString(RuntimeConfigDefine.TASK_CLASS);
- ClassLoader loader = plugin.getPluginClassLoader(taskClass);
- final ClassLoader currentThreadLoader = plugin.currentThreadLoader();
- Class taskClazz;
- boolean isolationFlag = false;
- if (loader instanceof PluginClassLoader) {
- taskClazz = ((PluginClassLoader) loader).loadClass(taskClass, false);
- isolationFlag = true;
- } else {
- taskClazz = Class.forName(taskClass);
- }
- final Task task = (Task) taskClazz.getDeclaredConstructor().newInstance();
- final String converterClazzName = keyValue.getString(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER);
- Converter recordConverter = null;
- if (StringUtils.isNotEmpty(converterClazzName)) {
- Class converterClazz = Class.forName(converterClazzName);
- recordConverter = (Converter) converterClazz.newInstance();
- }
- if (isolationFlag) {
- Plugin.compareAndSwapLoaders(loader);
- }
- if (task instanceof SourceTask) {
- DefaultMQProducer producer = ConnectUtil.initDefaultMQProducer(connectConfig);
- TransformChain<ConnectRecord> transformChain = new TransformChain<>(keyValue, plugin);
- WorkerSourceTask workerSourceTask = new WorkerSourceTask(connectorName,
- (SourceTask) task, keyValue, positionManagementService, recordConverter, producer, workerState, transformChain);
- Plugin.compareAndSwapLoaders(currentThreadLoader);
-
- Future future = taskExecutor.submit(workerSourceTask);
- taskToFutureMap.put(workerSourceTask, future);
- this.pendingTasks.put(workerSourceTask, System.currentTimeMillis());
- } else if (task instanceof SinkTask) {
- DefaultMQPullConsumer consumer = ConnectUtil.initDefaultMQPullConsumer(connectConfig);
- if (connectConfig.isAutoCreateGroupEnable()) {
- ConnectUtil.createSubGroup(connectConfig, consumer.getConsumerGroup());
+ try {
+ String taskClass = keyValue.getString(RuntimeConfigDefine.TASK_CLASS);
+ ClassLoader loader = plugin.getPluginClassLoader(taskClass);
+ final ClassLoader currentThreadLoader = plugin.currentThreadLoader();
+ Class taskClazz;
+ boolean isolationFlag = false;
+ if (loader instanceof PluginClassLoader) {
+ taskClazz = ((PluginClassLoader) loader).loadClass(taskClass, false);
+ isolationFlag = true;
+ } else {
+ taskClazz = Class.forName(taskClass);
}
- TransformChain<ConnectRecord> transformChain = new TransformChain<>(keyValue, plugin);
- WorkerSinkTask workerSinkTask = new WorkerSinkTask(connectorName,
- (SinkTask) task, keyValue, offsetManagementService, recordConverter, consumer, workerState, transformChain);
- Plugin.compareAndSwapLoaders(currentThreadLoader);
- Future future = taskExecutor.submit(workerSinkTask);
- taskToFutureMap.put(workerSinkTask, future);
- this.pendingTasks.put(workerSinkTask, System.currentTimeMillis());
+ final Task task = (Task) taskClazz.getDeclaredConstructor().newInstance();
+ final String converterClazzName = keyValue.getString(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER);
+ Converter recordConverter = null;
+ if (StringUtils.isNotEmpty(converterClazzName)) {
+ Class converterClazz = Class.forName(converterClazzName);
+ recordConverter = (Converter) converterClazz.newInstance();
+ }
+ if (isolationFlag) {
+ Plugin.compareAndSwapLoaders(loader);
+ }
+ if (task instanceof SourceTask) {
+ DefaultMQProducer producer = ConnectUtil.initDefaultMQProducer(connectConfig);
+ TransformChain<ConnectRecord> transformChain = new TransformChain<>(keyValue, plugin);
+ WorkerSourceTask workerSourceTask = new WorkerSourceTask(connectorName,
+ (SourceTask) task, keyValue, positionManagementService, recordConverter, producer, workerState, connectStatsManager, connectStatsService, transformChain);
+ Plugin.compareAndSwapLoaders(currentThreadLoader);
+
+ Future future = taskExecutor.submit(workerSourceTask);
+ taskToFutureMap.put(workerSourceTask, future);
+ this.pendingTasks.put(workerSourceTask, System.currentTimeMillis());
+ } else if (task instanceof SinkTask) {
+ log.info("sink task config keyValue is {}", keyValue.getProperties());
+ DefaultMQPullConsumer consumer = ConnectUtil.initDefaultMQPullConsumer(connectConfig, connectorName, keyValue);
+ if (connectConfig.isAutoCreateGroupEnable()) {
+ ConnectUtil.createSubGroup(connectConfig, consumer.getConsumerGroup());
+ }
+ TransformChain<ConnectRecord> transformChain = new TransformChain<>(keyValue, plugin);
+ WorkerSinkTask workerSinkTask = new WorkerSinkTask(connectorName,
+ (SinkTask) task, keyValue, recordConverter, consumer, workerState, connectStatsManager, connectStatsService, transformChain);
+ Plugin.compareAndSwapLoaders(currentThreadLoader);
+ Future future = taskExecutor.submit(workerSinkTask);
+ taskToFutureMap.put(workerSinkTask, future);
+ this.pendingTasks.put(workerSinkTask, System.currentTimeMillis());
+ }
+ } catch (Exception e) {
+ log.error("start worker task exception. config {}" + JSON.toJSONString(keyValue), e);
}
}
}
-
// STEP 3: check all pending state
for (Map.Entry<Runnable, Long> entry : pendingTasks.entrySet()) {
Runnable runnable = entry.getKey();
@@ -503,7 +512,7 @@ public class Worker {
}
// STEP 5 check errorTasks and stopped tasks
- for (Runnable runnable: errorTasks) {
+ for (Runnable runnable : errorTasks) {
WorkerTask workerTask = (WorkerTask) runnable;
Future future = taskToFutureMap.get(runnable);
@@ -515,7 +524,7 @@ public class Worker {
}
} catch (ExecutionException e) {
Throwable t = e.getCause();
- } catch (CancellationException | TimeoutException | InterruptedException e) {
+ } catch (CancellationException | TimeoutException | InterruptedException e) {
} finally {
future.cancel(true);
@@ -527,9 +536,8 @@ public class Worker {
}
}
-
// STEP 5 check errorTasks and stopped tasks
- for (Runnable runnable: stoppedTasks) {
+ for (Runnable runnable : stoppedTasks) {
WorkerTask workerTask = (WorkerTask) runnable;
workerTask.cleanup();
Future future = taskToFutureMap.get(runnable);
@@ -552,8 +560,7 @@ public class Worker {
} catch (InterruptedException e) {
log.info("[BUG] Stopped Tasks should not throw any exception");
e.printStackTrace();
- }
- finally {
+ } finally {
future.cancel(true);
taskToFutureMap.remove(runnable);
stoppedTasks.remove(runnable);
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
index 0499534..7b1721b 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
@@ -27,7 +27,6 @@ import io.openmessaging.connector.api.data.RecordPartition;
import io.openmessaging.connector.api.data.Schema;
import io.openmessaging.connector.api.errors.ConnectException;
import io.openmessaging.connector.api.errors.RetriableException;
-import io.openmessaging.connector.api.storage.OffsetStorageReader;
import io.openmessaging.internal.DefaultKeyValue;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -41,6 +40,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
@@ -60,8 +60,8 @@ import org.apache.rocketmq.connect.runtime.common.QueueState;
import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
import org.apache.rocketmq.connect.runtime.config.SinkConnectorConfig;
import org.apache.rocketmq.connect.runtime.converter.RocketMQConverter;
-import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
-import org.apache.rocketmq.connect.runtime.store.PositionStorageReaderImpl;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
@@ -115,13 +115,6 @@ public class WorkerSinkTask implements WorkerTask {
*/
private final DefaultMQPullConsumer consumer;
- private final PositionManagementService offsetManagementService;
-
- /**
- *
- */
- private final OffsetStorageReader offsetStorageReader;
-
/**
* A converter to parse sink data entry to object.
*/
@@ -152,6 +145,10 @@ public class WorkerSinkTask implements WorkerTask {
private final AtomicReference<WorkerState> workerState;
+ private final ConnectStatsManager connectStatsManager;
+
+ private final ConnectStatsService connectStatsService;
+
private final CountDownLatch stopPullMsgLatch;
private WorkerSinkTaskContext sinkTaskContext;
@@ -163,25 +160,38 @@ public class WorkerSinkTask implements WorkerTask {
public static final String TOPIC = "topic";
public static final String QUEUE_OFFSET = "queueOffset";
+ private static final Set<String> MQ_SYS_KEYS = new HashSet<String>() {
+ {
+ add("MIN_OFFSET");
+ add("TRACE_ON");
+ add("MAX_OFFSET");
+ add("MSG_REGION");
+ add("UNIQ_KEY");
+ add("WAIT");
+ add("TAGS");
+ }
+ };
+
public WorkerSinkTask(String connectorName,
SinkTask sinkTask,
ConnectKeyValue taskConfig,
- PositionManagementService offsetManagementService,
Converter recordConverter,
DefaultMQPullConsumer consumer,
AtomicReference<WorkerState> workerState,
+ ConnectStatsManager connectStatsManager,
+ ConnectStatsService connectStatsService,
TransformChain<ConnectRecord> transformChain) {
this.connectorName = connectorName;
this.sinkTask = sinkTask;
this.taskConfig = taskConfig;
this.consumer = consumer;
- this.offsetManagementService = offsetManagementService;
- this.offsetStorageReader = new PositionStorageReaderImpl(offsetManagementService);
this.recordConverter = recordConverter;
this.messageQueuesOffsetMap = new ConcurrentHashMap<>(256);
this.messageQueuesStateMap = new ConcurrentHashMap<>(256);
this.state = new AtomicReference<>(WorkerTaskState.NEW);
this.workerState = workerState;
+ this.connectStatsManager = connectStatsManager;
+ this.connectStatsService = connectStatsService;
this.stopPullMsgLatch = new CountDownLatch(1);
this.transformChain = transformChain;
}
@@ -210,13 +220,19 @@ public class WorkerSinkTask implements WorkerTask {
setQueueOffset();
pullMessageFromQueues();
} catch (RetriableException e) {
+ connectStatsManager.incSinkRecordPutTotalFailNums();
+ connectStatsManager.incSinkRecordPutFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
log.error("Sink task RetriableException exception", e);
} catch (InterruptedException e) {
+ connectStatsManager.incSinkRecordPutTotalFailNums();
+ connectStatsManager.incSinkRecordPutFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
log.error("Sink task InterruptedException exception", e);
throw e;
} catch (Throwable e) {
state.set(WorkerTaskState.ERROR);
- log.error("sink task {}, pull message MQClientException, Error {} ", this, e.getMessage(), e);
+ log.error(" sink task {},pull message MQClientException, Error {} ", this, e.getMessage(), e);
+ connectStatsManager.incSinkRecordPutTotalFailNums();
+ connectStatsManager.incSinkRecordPutFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
}
}
@@ -230,7 +246,7 @@ public class WorkerSinkTask implements WorkerTask {
} finally {
if (consumer != null) {
consumer.shutdown();
- log.info("Sink task consumer shutdown.");
+ log.info("Sink task consumer shutdown. config:{}", JSON.toJSONString(taskConfig));
}
}
}
@@ -350,26 +366,52 @@ public class WorkerSinkTask implements WorkerTask {
break;
}
PullResult pullResult = null;
+ final long beginPullMsgTimestamp = System.currentTimeMillis();
try {
shouldStopPullMsg();
pullResult = consumer.pullBlockIfNotFound(entry.getKey(), "*", entry.getValue(), MAX_MESSAGE_NUM);
pullMsgErrorCount = 0;
} catch (MQClientException e) {
pullMsgErrorCount++;
- log.error(" sink task message queue {}, offset {}, taskconfig {},pull message MQClientException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
+ log.error(" sink task message queue {}, offset {}, taskconfig {},pull message MQClientException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
+ connectStatsManager.incSinkRecordReadTotalFailNums();
+ connectStatsManager.incSinkRecordReadFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
+ long errorPullRT = System.currentTimeMillis() - beginPullMsgTimestamp;
+ connectStatsManager.incSinkRecordReadTotalFailRT(errorPullRT);
+ connectStatsManager.incSinkRecordReadFailRT(taskConfig.getString(RuntimeConfigDefine.TASK_ID), errorPullRT);
} catch (RemotingException e) {
pullMsgErrorCount++;
- log.error(" sink task message queue {}, offset {}, taskconfig {},pull message RemotingException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
+ log.error(" sink task message queue {}, offset {}, taskconfig {},pull message RemotingException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
+ connectStatsManager.incSinkRecordReadTotalFailNums();
+ connectStatsManager.incSinkRecordReadFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
+ long errorPullRT = System.currentTimeMillis() - beginPullMsgTimestamp;
+ connectStatsManager.incSinkRecordReadTotalFailRT(errorPullRT);
+ connectStatsManager.incSinkRecordReadFailRT(taskConfig.getString(RuntimeConfigDefine.TASK_ID), errorPullRT);
} catch (MQBrokerException e) {
pullMsgErrorCount++;
- log.error(" sink task message queue {}, offset {}, taskconfig {},pull message MQBrokerException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
+ log.error(" sink task message queue {}, offset {}, taskconfig {},pull message MQBrokerException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
+ connectStatsManager.incSinkRecordReadTotalFailNums();
+ connectStatsManager.incSinkRecordReadFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
+ long errorPullRT = System.currentTimeMillis() - beginPullMsgTimestamp;
+ connectStatsManager.incSinkRecordReadTotalFailRT(errorPullRT);
+ connectStatsManager.incSinkRecordReadFailRT(taskConfig.getString(RuntimeConfigDefine.TASK_ID), errorPullRT);
} catch (InterruptedException e) {
pullMsgErrorCount++;
- log.error(" sink task message queue {}, offset {}, taskconfig {},pull message InterruptedException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
+ log.error(" sink task message queue {}, offset {}, taskconfig {},pull message InterruptedException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
+ connectStatsManager.incSinkRecordReadTotalFailNums();
+ connectStatsManager.incSinkRecordReadFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
+ long errorPullRT = System.currentTimeMillis() - beginPullMsgTimestamp;
+ connectStatsManager.incSinkRecordReadTotalFailRT(errorPullRT);
+ connectStatsManager.incSinkRecordReadFailRT(taskConfig.getString(RuntimeConfigDefine.TASK_ID), errorPullRT);
throw e;
} catch (Throwable e) {
pullMsgErrorCount++;
- log.error(" sink task message queue {}, offset {}, taskconfig {},pull message Throwable, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), e);
+ log.error(" sink task message queue {}, offset {}, taskconfig {},pull message Throwable, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), e);
+ connectStatsManager.incSinkRecordReadTotalFailNums();
+ connectStatsManager.incSinkRecordReadFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
+ long errorPullRT = System.currentTimeMillis() - beginPullMsgTimestamp;
+ connectStatsManager.incSinkRecordReadTotalFailRT(errorPullRT);
+ connectStatsManager.incSinkRecordReadFailRT(taskConfig.getString(RuntimeConfigDefine.TASK_ID), errorPullRT);
throw e;
}
long currentTime = System.currentTimeMillis();
@@ -379,11 +421,16 @@ public class WorkerSinkTask implements WorkerTask {
if (null != pullResult && pullResult.getPullStatus().equals(PullStatus.FOUND)) {
this.incPullTPS(entry.getKey().getTopic(), pullResult.getMsgFoundList().size());
messages = pullResult.getMsgFoundList();
+ connectStatsManager.incSinkRecordReadTotalNums();
+ connectStatsManager.incSinkRecordReadNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID), messages.size());
+ long pullRT = System.currentTimeMillis() - beginPullMsgTimestamp;
+ connectStatsManager.incSinkRecordReadTotalRT(pullRT);
+ connectStatsManager.incSinkRecordReadRT(taskConfig.getString(RuntimeConfigDefine.TASK_ID), pullRT);
receiveMessages(messages);
if (messageQueuesOffsetMap.containsKey(entry.getKey())) {
messageQueuesOffsetMap.put(entry.getKey(), pullResult.getNextBeginOffset());
} else {
- log.warn("The consumer may have load balancing, and the current task does not process the message queue,messageQueuesOffsetMap {}, messageQueue {}", JSON.toJSONString(messageQueuesOffsetMap), JSON.toJSONString(entry.getKey()));
+ log.warn("The consumer may have load balancing, and the current task does not process the message queue,messageQueuesOffsetMap {}, messageQueue {}", JSON.toJSONString(messageQueuesOffsetMap), JSON.toJSONString(entry.getKey()));
}
try {
consumer.updateConsumeOffset(entry.getKey(), pullResult.getNextBeginOffset());
@@ -401,6 +448,11 @@ public class WorkerSinkTask implements WorkerTask {
} else {
log.info("no new message, pullResult {}, message queue {}, pull offset {}", JSON.toJSONString(pullResult), JSON.toJSONString(entry.getKey()), entry.getValue());
}
+
+ AtomicLong atomicLong = connectStatsService.singleSinkTaskTimesTotal(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
+ if (null != atomicLong) {
+ atomicLong.addAndGet(org.apache.commons.collections4.CollectionUtils.isEmpty(messages) ? 0 : messages.size());
+ }
}
}
@@ -516,7 +568,13 @@ public class WorkerSinkTask implements WorkerTask {
KeyValue keyValue = new DefaultKeyValue();
if (MapUtils.isNotEmpty(properties)) {
for (Map.Entry<String, String> entry : properties.entrySet()) {
- keyValue.put(entry.getKey(), entry.getValue());
+ if (MQ_SYS_KEYS.contains(entry.getKey())) {
+ keyValue.put("MQ-SYS-" + entry.getKey(), entry.getValue());
+ } else if (entry.getKey().startsWith("connect-ext-")){
+ keyValue.put(entry.getKey().replaceAll("connect-ext-", ""), entry.getValue());
+ } else {
+ keyValue.put(entry.getKey(), entry.getValue());
+ }
}
}
sinkDataEntry.addExtension(keyValue);
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index 5da46b7..36a10fd 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -19,6 +19,7 @@
package org.apache.rocketmq.connect.runtime.connectorwrapper;
import com.alibaba.fastjson.JSON;
+import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.task.source.SourceTask;
import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
import io.openmessaging.connector.api.data.ConnectRecord;
@@ -26,14 +27,15 @@ import io.openmessaging.connector.api.data.Converter;
import io.openmessaging.connector.api.data.RecordOffset;
import io.openmessaging.connector.api.data.RecordPartition;
import io.openmessaging.connector.api.data.RecordPosition;
-import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.errors.ConnectException;
import io.openmessaging.connector.api.errors.RetriableException;
import io.openmessaging.connector.api.storage.OffsetStorageReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -47,6 +49,8 @@ import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
import org.apache.rocketmq.connect.runtime.converter.RocketMQConverter;
import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
import org.apache.rocketmq.connect.runtime.store.PositionStorageReaderImpl;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
@@ -100,6 +104,10 @@ public class WorkerSourceTask implements WorkerTask {
private final AtomicReference<WorkerState> workerState;
+ private ConnectStatsManager connectStatsManager;
+
+ private ConnectStatsService connectStatsService;
+
private List<ConnectRecord> toSendRecord;
private TransformChain<ConnectRecord> transformChain;
@@ -111,6 +119,8 @@ public class WorkerSourceTask implements WorkerTask {
Converter recordConverter,
DefaultMQProducer producer,
AtomicReference<WorkerState> workerState,
+ ConnectStatsManager connectStatsManager,
+ ConnectStatsService connectStatsService,
TransformChain<ConnectRecord> transformChain) {
this.connectorName = connectorName;
this.sourceTask = sourceTask;
@@ -121,6 +131,8 @@ public class WorkerSourceTask implements WorkerTask {
this.recordConverter = recordConverter;
this.state = new AtomicReference<>(WorkerTaskState.NEW);
this.workerState = workerState;
+ this.connectStatsManager = connectStatsManager;
+ this.connectStatsService = connectStatsService;
this.transformChain = transformChain;
}
@@ -155,22 +167,31 @@ public class WorkerSourceTask implements WorkerTask {
try {
toSendRecord = poll();
if (null != toSendRecord && toSendRecord.size() > 0) {
+ connectStatsManager.incSourceRecordPollTotalNums();
+ connectStatsManager.incSourceRecordPollNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
sendRecord();
}
} catch (RetriableException e) {
+ connectStatsManager.incSourceRecordPollTotalFailNums();
+ connectStatsManager.incSourceRecordPollFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
log.error("Source task RetriableException exception", e);
} catch (Exception e) {
+ connectStatsManager.incSourceRecordPollTotalFailNums();
+ connectStatsManager.incSourceRecordPollFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
log.error("Source task RetriableException exception", e);
state.set(WorkerTaskState.ERROR);
}
}
-
+ AtomicLong atomicLong = connectStatsService.singleSourceTaskTimesTotal(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
+ if (null != atomicLong) {
+ atomicLong.addAndGet(toSendRecord == null ? 0 : toSendRecord.size());
+ }
}
sourceTask.stop();
state.compareAndSet(WorkerTaskState.STOPPING, WorkerTaskState.STOPPED);
log.info("Source task stop, config:{}", JSON.toJSONString(taskConfig));
} catch (Exception e) {
- log.error("Run task failed., task config {}", JSON.toJSONString(taskConfig), e);
+ log.error("Run task failed., task config: " + JSON.toJSONString(taskConfig), e);
state.set(WorkerTaskState.ERROR);
} finally {
if (producer != null) {
@@ -184,6 +205,9 @@ public class WorkerSourceTask implements WorkerTask {
List<ConnectRecord> connectRecordList = null;
try {
connectRecordList = sourceTask.poll();
+ if (CollectionUtils.isEmpty(connectRecordList)) {
+ return null;
+ }
List<ConnectRecord> connectRecordList1 = new ArrayList<>(32);
for (ConnectRecord connectRecord : connectRecordList) {
ConnectRecord connectRecord1 = this.transformChain.doTransforms(connectRecord);
@@ -249,6 +273,9 @@ public class WorkerSourceTask implements WorkerTask {
}
topic = (String) o;
}
+ if (StringUtils.isBlank(topic)) {
+ throw new ConnectException("source connect lack of topic config");
+ }
sourceMessage.setTopic(topic);
if (null == recordConverter || recordConverter instanceof RocketMQConverter) {
putExtendMsgProperty(sourceDataEntry, sourceMessage, topic);
@@ -272,7 +299,9 @@ public class WorkerSourceTask implements WorkerTask {
try {
producer.send(sourceMessage, new SendCallback() {
@Override public void onSuccess(org.apache.rocketmq.client.producer.SendResult result) {
- log.info("Successful send message to RocketMQ:{}", result.getMsgId());
+ log.info("Successful send message to RocketMQ:{}, Topic {}", result.getMsgId(), result.getMessageQueue().getTopic());
+ connectStatsManager.incSourceRecordWriteTotalNums();
+ connectStatsManager.incSourceRecordWriteNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
RecordPartition partition = position.getPartition();
try {
if (null != partition && null != position) {
@@ -288,14 +317,22 @@ public class WorkerSourceTask implements WorkerTask {
@Override public void onException(Throwable throwable) {
log.error("Source task send record failed ,error msg {}. message {}", throwable.getMessage(), JSON.toJSONString(sourceMessage), throwable);
+ connectStatsManager.incSourceRecordWriteTotalFailNums();
+ connectStatsManager.incSourceRecordWriteFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
}
});
} catch (MQClientException e) {
log.error("Send message MQClientException. message: {}, error info: {}.", sourceMessage, e);
+ connectStatsManager.incSourceRecordWriteTotalFailNums();
+ connectStatsManager.incSourceRecordWriteFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
} catch (RemotingException e) {
log.error("Send message RemotingException. message: {}, error info: {}.", sourceMessage, e);
+ connectStatsManager.incSourceRecordWriteTotalFailNums();
+ connectStatsManager.incSourceRecordWriteFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
} catch (InterruptedException e) {
log.error("Send message InterruptedException. message: {}, error info: {}.", sourceMessage, e);
+ connectStatsManager.incSourceRecordWriteTotalFailNums();
+ connectStatsManager.incSourceRecordWriteFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
throw e;
}
}
@@ -303,34 +340,18 @@ public class WorkerSourceTask implements WorkerTask {
}
private void putExtendMsgProperty(ConnectRecord sourceDataEntry, Message sourceMessage, String topic) {
- String shardingKey = sourceDataEntry.getExtension(RuntimeConfigDefine.CONNECT_SHARDINGKEY);
- if (StringUtils.isNotEmpty(shardingKey)) {
- MessageAccessor.putProperty(sourceMessage, RuntimeConfigDefine.CONNECT_SHARDINGKEY, shardingKey);
- }
- if (StringUtils.isNotEmpty(topic)) {
- MessageAccessor.putProperty(sourceMessage, RuntimeConfigDefine.CONNECT_TOPICNAME, topic);
- }
- String sourcePartition = sourceDataEntry.getExtension(RuntimeConfigDefine.CONNECT_SOURCE_PARTITION);
- if (StringUtils.isNotEmpty(sourcePartition)) {
- MessageAccessor.putProperty(sourceMessage, RuntimeConfigDefine.CONNECT_SOURCE_PARTITION, sourcePartition);
- }
- String sourcePosition = sourceDataEntry.getExtension(RuntimeConfigDefine.CONNECT_SOURCE_POSITION);
- if (StringUtils.isNotEmpty(sourcePosition)) {
- MessageAccessor.putProperty(sourceMessage, RuntimeConfigDefine.CONNECT_SOURCE_POSITION, sourcePosition);
- }
- String entryType = sourceDataEntry.getExtension(RuntimeConfigDefine.CONNECT_ENTRYTYPE);
- if (StringUtils.isNotEmpty(entryType)) {
- MessageAccessor.putProperty(sourceMessage, RuntimeConfigDefine.CONNECT_ENTRYTYPE, entryType);
+ KeyValue extensionKeyValues = sourceDataEntry.getExtensions();
+ if (null == extensionKeyValues) {
+ log.info("extension key value is null.");
+ return;
}
- Long timestamp = sourceDataEntry.getTimestamp();
- Optional<Long> otimestamp = Optional.ofNullable(timestamp);
- if (otimestamp.isPresent()) {
- MessageAccessor.putProperty(sourceMessage, RuntimeConfigDefine.CONNECT_TIMESTAMP, otimestamp.get().toString());
+ Set<String> keySet = extensionKeyValues.keySet();
+ if (CollectionUtils.isEmpty(keySet)) {
+ log.info("extension keySet null.");
+ return;
}
- Schema schema = sourceDataEntry.getSchema();
- Optional<Schema> oschema = Optional.ofNullable(schema);
- if (oschema.isPresent()) {
- MessageAccessor.putProperty(sourceMessage, RuntimeConfigDefine.CONNECT_SCHEMA, JSON.toJSONString(oschema.get()));
+ for (String key : keySet) {
+ MessageAccessor.putProperty(sourceMessage, "connect-ext-" + key, extensionKeyValues.getString(key));
}
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
index b9a7fe3..b2a3a5b 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
@@ -57,6 +57,7 @@ public class RestHandler {
app.get("/connectors/enableAll", this::handleEnableAllConnector);
app.get("/connectors/disableAll", this::handleDisableAllConnector);
app.get("/connectors/:connectorName", this::handleCreateConnector);
+ app.post("/connectors/:connectorName", this::handleCreateConnector);
app.get("/connectors/:connectorName/config", this::handleQueryConnectorConfig);
app.get("/connectors/:connectorName/status", this::handleQueryConnectorStatus);
app.get("/connectors/:connectorName/stop", this::handleStopConnector);
@@ -124,7 +125,12 @@ public class RestHandler {
private void handleCreateConnector(Context context) {
String connectorName = context.pathParam("connectorName");
- String arg = context.req.getParameter("config");
+ String arg;
+ if (context.req.getMethod().equals("POST")) {
+ arg = context.body();
+ } else {
+ arg = context.req.getParameter("config");
+ }
if (arg == null) {
context.result("failed! query param 'config' is required ");
return;
@@ -145,7 +151,7 @@ public class RestHandler {
}
} catch (Exception e) {
log.error("Handle createConnector error .", e);
- context.result("failed");
+ context.result("failed:" + e.getMessage());
}
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
index 653e6e9..354fca8 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
@@ -197,6 +197,12 @@ public class ConfigManagementServiceImpl implements ConfigManagementService {
newKeyValue.put(RuntimeConfigDefine.CONNECT_TOPICNAME, configs.getString(RuntimeConfigDefine.CONNECT_TOPICNAME));
newKeyValue.put(RuntimeConfigDefine.CONNECT_TOPICNAMES, configs.getString(RuntimeConfigDefine.CONNECT_TOPICNAMES));
+ Set<String> connectConfigKeySet = configs.keySet();
+ for (String connectConfigKey : connectConfigKeySet) {
+ if (connectConfigKey.startsWith(RuntimeConfigDefine.TRANSFORMS)) {
+ newKeyValue.put(connectConfigKey, configs.getString(connectConfigKey));
+ }
+ }
converterdConfigs.add(newKeyValue);
}
putTaskConfigs(connectorName, converterdConfigs);
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/TaskPositionCommitService.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/TaskPositionCommitService.java
index 9b8764d..72fa0b9 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/TaskPositionCommitService.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/TaskPositionCommitService.java
@@ -36,15 +36,10 @@ public class TaskPositionCommitService extends ServiceThread {
private final PositionManagementService positionManagementService;
- private final PositionManagementService offsetManagementService;
-
-
public TaskPositionCommitService(Worker worker,
- PositionManagementService positionManagementService,
- PositionManagementService offsetManagementService) {
+ PositionManagementService positionManagementService) {
this.worker = worker;
this.positionManagementService = positionManagementService;
- this.offsetManagementService = offsetManagementService;
}
@Override
@@ -67,8 +62,6 @@ public class TaskPositionCommitService extends ServiceThread {
public void commitTaskPosition() {
positionManagementService.persist();
- offsetManagementService.persist();
positionManagementService.synchronize();
- offsetManagementService.synchronize();
}
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStats.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStats.java
new file mode 100644
index 0000000..ed462cb
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStats.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.rocketmq.connect.runtime.stats;
+
+import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+public class ConnectStats {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.ROCKETMQ_CONNECT_STATS);
+
+ private final ConnectStatsService connectStatsService;
+
+ private volatile long msgPutTotalYesterdayMorning;
+
+ private volatile long msgPutTotalTodayMorning;
+
+ private volatile long msgGetTotalYesterdayMorning;
+
+ private volatile long msgGetTotalTodayMorning;
+
+ public ConnectStats(ConnectStatsService connectStatsService) {
+ this.connectStatsService = connectStatsService;
+ }
+
+ public void record() {
+ this.msgPutTotalYesterdayMorning = this.msgPutTotalTodayMorning;
+ this.msgGetTotalYesterdayMorning = this.msgGetTotalTodayMorning;
+
+ log.info("yesterday put message total: {}", msgPutTotalTodayMorning - msgPutTotalYesterdayMorning);
+ log.info("yesterday get message total: {}", msgGetTotalTodayMorning - msgGetTotalYesterdayMorning);
+ }
+
+ public long getMsgPutTotalYesterdayMorning() {
+ return msgPutTotalYesterdayMorning;
+ }
+
+ public void setMsgPutTotalYesterdayMorning(long msgPutTotalYesterdayMorning) {
+ this.msgPutTotalYesterdayMorning = msgPutTotalYesterdayMorning;
+ }
+
+ public long getMsgPutTotalTodayMorning() {
+ return msgPutTotalTodayMorning;
+ }
+
+ public void setMsgPutTotalTodayMorning(long msgPutTotalTodayMorning) {
+ this.msgPutTotalTodayMorning = msgPutTotalTodayMorning;
+ }
+
+ public long getMsgGetTotalYesterdayMorning() {
+ return msgGetTotalYesterdayMorning;
+ }
+
+ public void setMsgGetTotalYesterdayMorning(long msgGetTotalYesterdayMorning) {
+ this.msgGetTotalYesterdayMorning = msgGetTotalYesterdayMorning;
+ }
+
+ public long getMsgGetTotalTodayMorning() {
+ return msgGetTotalTodayMorning;
+ }
+
+ public void setMsgGetTotalTodayMorning(long msgGetTotalTodayMorning) {
+ this.msgGetTotalTodayMorning = msgGetTotalTodayMorning;
+ }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsManager.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsManager.java
new file mode 100644
index 0000000..b7a03e9
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsManager.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.rocketmq.connect.runtime.stats;
+
+import java.util.HashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.stats.StatsItem;
+import org.apache.rocketmq.common.stats.StatsItemSet;
+import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+import static org.apache.rocketmq.connect.runtime.common.LoggerName.ROCKETMQ_CONNECT_STATS;
+
+public class ConnectStatsManager {
+
+ public static final String SOURCE_RECORD_WRITE_NUMS = "SOURCE_RECORD_WRITE_NUMS";
+ public static final String SOURCE_RECORD_WRITE_TOTAL_NUMS = "SOURCE_RECORD_WRITE_TOTAL_NUMS";
+
+ public static final String SOURCE_RECORD_WRITE_FAIL_NUMS = "SOURCE_RECORD_WRITE_FAIL_NUMS";
+ public static final String SOURCE_RECORD_WRITE_TOTAL_FAIL_NUMS = "SOURCE_RECORD_WRITE_TOTAL_FAIL_NUMS";
+
+ public static final String SOURCE_RECORD_POLL_NUMS = "SOURCE_RECORD_POLL_NUMS";
+ public static final String SOURCE_RECORD_POLL_TOTAL_NUMS = "SOURCE_RECORD_POLL_TOTAL_NUMS";
+
+ public static final String SOURCE_RECORD_POLL_FAIL_NUMS = "SOURCE_RECORD_POLL_FAIL_NUMS";
+ public static final String SOURCE_RECORD_POLL_FAIL_TOTAL_NUMS = "SOURCE_RECORD_POLL_FAIL_TOTAL_NUMS";
+
+ public static final String SINK_RECORD_READ_NUMS = "SINK_RECORD_READ_NUMS";
+ public static final String SINK_RECORD_READ_TOTAL_NUMS = "SINK_RECORD_READ_TOTAL_NUMS";
+
+ public static final String SINK_RECORD_READ_RT = "SINK_RECORD_READ_RT";
+ public static final String SINK_RECORD_READ_TOTAL_RT = "SINK_RECORD_READ_TOTAL_RT";
+
+ public static final String SINK_RECORD_READ_FAIL_NUMS = "SINK_RECORD_READ_FAIL_NUMS";
+ public static final String SINK_RECORD_READ_TOTAL_FAIL_NUMS = "SINK_RECORD_READ_TOTAL_FAIL_NUMS";
+
+ public static final String SINK_RECORD_READ_FAIL_RT = "SINK_RECORD_READ_FAIL_RT";
+ public static final String SINK_RECORD_READ_TOTAL_FAIL_RT = "SINK_RECORD_READ_TOTAL_FAIL_RT";
+
+ public static final String SINK_RECORD_PUT_NUMS = "SINK_RECORD_PUT_NUMS";
+ public static final String SINK_RECORD_PUT_TOTAL_NUMS = "SINK_RECORD_PUT_TOTAL_NUMS";
+
+ public static final String SINK_RECORD_PUT_RT = "SINK_RECORD_PUT_RT";
+ public static final String SINK_RECORD_PUT_TOTAL_RT = "SINK_RECORD_PUT_TOTAL_RT";
+
+ public static final String SINK_RECORD_PUT_FAIL_NUMS = "SINK_RECORD_PUT_FAIL_NUMS";
+ public static final String SINK_RECORD_PUT_TOTAL_FAIL_NUMS = "SINK_RECORD_PUT_TOTAL_FAIL_NUMS";
+
+ public static final String SINK_RECORD_PUT_FAIL_RT = "SINK_RECORD_PUT_FAIL_RT";
+ public static final String SINK_RECORD_PUT_TOTAL_FAIL_RT = "SINK_RECORD_PUT_TOTAL_FAIL_RT";
+
+ /**
+ * read disk follow stats
+ */
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(ROCKETMQ_CONNECT_STATS);
+
+
+ private static final InternalLogger COMMERCIAL_LOG = InternalLoggerFactory.getLogger(
+ LoggerName.COMMERCIAL_LOGGER_NAME);
+ private final ScheduledExecutorService scheduledExecutorService =
+ ThreadUtils.newSingleThreadScheduledExecutor("ConnectStatsThread", true);
+ private final ScheduledExecutorService commercialExecutor =
+ ThreadUtils.newSingleThreadScheduledExecutor("CommercialStatsThread", true);
+ private final ScheduledExecutorService accountExecutor =
+ ThreadUtils.newSingleThreadScheduledExecutor("AccountStatsThread", true);
+
+ private final HashMap<String, StatsItemSet> statsTable = new HashMap<String, StatsItemSet>();
+ private final String worker;
+ private ConnectConfig connectConfig;
+
+ public ConnectStatsManager(ConnectConfig connectConfig) {
+ this.connectConfig = connectConfig;
+ this.worker = connectConfig.getWorkerId();
+ init();
+ }
+
+ public void init() {
+ this.statsTable.put(SOURCE_RECORD_WRITE_NUMS, new StatsItemSet(SOURCE_RECORD_WRITE_NUMS, this.scheduledExecutorService, log));
+ this.statsTable.put(SOURCE_RECORD_WRITE_TOTAL_NUMS, new StatsItemSet(SOURCE_RECORD_WRITE_TOTAL_NUMS, this.scheduledExecutorService, log));
+ this.statsTable.put(SOURCE_RECORD_WRITE_FAIL_NUMS, new StatsItemSet(SOURCE_RECORD_WRITE_FAIL_NUMS, this.scheduledExecutorService, log));
+ this.statsTable.put(SOURCE_RECORD_WRITE_TOTAL_FAIL_NUMS, new StatsItemSet(SOURCE_RECORD_WRITE_TOTAL_FAIL_NUMS, this.scheduledExecutorService, log));
+ this.statsTable.put(SOURCE_RECORD_POLL_NUMS, new StatsItemSet(SOURCE_RECORD_POLL_NUMS, this.scheduledExecutorService, log));
+ this.statsTable.put(SOURCE_RECORD_POLL_TOTAL_NUMS, new StatsItemSet(SOURCE_RECORD_POLL_TOTAL_NUMS, this.scheduledExecutorService, log));
+ this.statsTable.put(SOURCE_RECORD_POLL_FAIL_NUMS, new StatsItemSet(SOURCE_RECORD_POLL_FAIL_NUMS, this.scheduledExecutorService, log));
+ this.statsTable.put(SOURCE_RECORD_POLL_FAIL_TOTAL_NUMS, new StatsItemSet(SOURCE_RECORD_POLL_FAIL_TOTAL_NUMS, this.scheduledExecutorService, log));
+ this.statsTable.put(SINK_RECORD_READ_NUMS, new StatsItemSet(SINK_RECORD_READ_NUMS, this.scheduledExecutorService, log));
+ this.statsTable.put(SINK_RECORD_READ_TOTAL_NUMS, new StatsItemSet(SINK_RECORD_READ_TOTAL_NUMS, this.scheduledExecutorService, log));
+ this.statsTable.put(SINK_RECORD_READ_FAIL_NUMS, new StatsItemSet(SINK_RECORD_READ_FAIL_NUMS, this.scheduledExecutorService, log));
+ this.statsTable.put(SINK_RECORD_READ_TOTAL_FAIL_NUMS, new StatsItemSet(SINK_RECORD_READ_TOTAL_FAIL_NUMS, this.scheduledExecutorService, log));
+ this.statsTable.put(SINK_RECORD_PUT_NUMS, new StatsItemSet(SINK_RECORD_PUT_NUMS, this.scheduledExecutorService, log));
+ this.statsTable.put(SINK_RECORD_PUT_TOTAL_NUMS, new StatsItemSet(SINK_RECORD_PUT_TOTAL_NUMS, this.scheduledExecutorService, log));
+ this.statsTable.put(SINK_RECORD_PUT_FAIL_NUMS, new StatsItemSet(SINK_RECORD_PUT_FAIL_NUMS, this.scheduledExecutorService, log));
+ this.statsTable.put(SINK_RECORD_PUT_TOTAL_FAIL_NUMS, new StatsItemSet(SINK_RECORD_PUT_TOTAL_FAIL_NUMS, this.scheduledExecutorService, log));
+
+ this.statsTable.put(SINK_RECORD_READ_RT, new StatsItemSet(SINK_RECORD_READ_RT, this.scheduledExecutorService, log));
+ this.statsTable.put(SINK_RECORD_READ_TOTAL_RT, new StatsItemSet(SINK_RECORD_READ_TOTAL_RT, this.scheduledExecutorService, log));
+ this.statsTable.put(SINK_RECORD_READ_FAIL_RT, new StatsItemSet(SINK_RECORD_READ_FAIL_RT, this.scheduledExecutorService, log));
+ this.statsTable.put(SINK_RECORD_READ_TOTAL_FAIL_RT, new StatsItemSet(SINK_RECORD_READ_TOTAL_FAIL_RT, this.scheduledExecutorService, log));
+ this.statsTable.put(SINK_RECORD_PUT_RT, new StatsItemSet(SINK_RECORD_PUT_RT, this.scheduledExecutorService, log));
+ this.statsTable.put(SINK_RECORD_PUT_TOTAL_RT, new StatsItemSet(SINK_RECORD_PUT_TOTAL_RT, this.scheduledExecutorService, log));
+ this.statsTable.put(SINK_RECORD_PUT_FAIL_RT, new StatsItemSet(SINK_RECORD_PUT_FAIL_RT, this.scheduledExecutorService, log));
+ this.statsTable.put(SINK_RECORD_PUT_TOTAL_FAIL_RT, new StatsItemSet(SINK_RECORD_PUT_TOTAL_FAIL_RT, this.scheduledExecutorService, log));
+ }
+
+ public void start() {
+ }
+
+ public void shutdown() {
+ this.scheduledExecutorService.shutdown();
+ this.commercialExecutor.shutdown();
+ }
+
+ public StatsItem getStatsItem(final String statsName, final String statsKey) {
+ try {
+ return this.statsTable.get(statsName).getStatsItem(statsKey);
+ } catch (Exception e) {
+ }
+
+ return null;
+ }
+
+ public void incSourceRecordPollTotalNums() {
+ this.statsTable.get(SOURCE_RECORD_POLL_TOTAL_NUMS).addValue(worker, 1, 1);
+
+ }
+
+ public void incSourceRecordPollNums(String taskId) {
+ if (StringUtils.isBlank(taskId)) {
+ return;
+ }
+ this.statsTable.get(SOURCE_RECORD_POLL_NUMS).addValue(taskId, 1, 1);
+ }
+
+ public void incSourceRecordPollTotalFailNums() {
+ this.statsTable.get(SOURCE_RECORD_POLL_FAIL_TOTAL_NUMS).addValue(worker, 1, 1);
+
+ }
+
+ public void incSourceRecordPollFailNums(String taskId) {
+ if (StringUtils.isBlank(taskId)) {
+ return;
+ }
+ this.statsTable.get(SOURCE_RECORD_POLL_FAIL_NUMS).addValue(taskId, 1, 1);
+ }
+
+ public void incSourceRecordWriteTotalNums() {
+ this.statsTable.get(SOURCE_RECORD_WRITE_TOTAL_NUMS).addValue(worker, 1, 1);
+ }
+
+ public void incSourceRecordWriteNums(String taskId) {
+ if (StringUtils.isBlank(taskId)) {
+ return;
+ }
+ this.statsTable.get(SOURCE_RECORD_WRITE_NUMS).addValue(taskId, 1, 1);
+ }
+
+ public void incSourceRecordWriteTotalFailNums() {
+ this.statsTable.get(SOURCE_RECORD_WRITE_TOTAL_FAIL_NUMS).addValue(worker, 1, 1);
+ }
+
+ public void incSourceRecordWriteFailNums(String taskId) {
+ if (StringUtils.isBlank(taskId)) {
+ return;
+ }
+ this.statsTable.get(SOURCE_RECORD_WRITE_FAIL_NUMS).addValue(taskId, 1, 1);
+ }
+
+ public void incSinkRecordPutTotalFailNums() {
+ this.statsTable.get(SINK_RECORD_PUT_TOTAL_FAIL_NUMS).addValue(worker, 1, 1);
+ }
+
+ public void incSinkRecordPutFailNums(String taskId) {
+ if (StringUtils.isBlank(taskId)) {
+ return;
+ }
+ this.statsTable.get(SINK_RECORD_PUT_FAIL_NUMS).addValue(taskId, 1, 1);
+ }
+
+ public void incSinkRecordReadTotalFailNums() {
+ this.statsTable.get(SINK_RECORD_READ_TOTAL_FAIL_NUMS).addValue(worker, 1, 1);
+ }
+
+ public void incSinkRecordReadFailNums(String taskId) {
+ if (StringUtils.isBlank(taskId)) {
+ return;
+ }
+ this.statsTable.get(SINK_RECORD_READ_FAIL_NUMS).addValue(taskId, 1, 1);
+ }
+
+ public void incSinkRecordReadTotalNums() {
+ this.statsTable.get(SINK_RECORD_READ_TOTAL_NUMS).addValue(worker, 1, 1);
+ }
+
+ public void incSinkRecordReadNums(String taskId) {
+ if (StringUtils.isBlank(taskId)) {
+ return;
+ }
+ this.statsTable.get(SINK_RECORD_READ_NUMS).addValue(taskId, 1, 1);
+ }
+
+ public void incSinkRecordReadNums(String taskId, int incValue) {
+ if (StringUtils.isBlank(taskId)) {
+ return;
+ }
+ this.statsTable.get(SINK_RECORD_READ_NUMS).addValue(taskId, incValue, 1);
+ }
+
+ public void incSinkRecordPutTotalNums() {
+ this.statsTable.get(SINK_RECORD_PUT_TOTAL_NUMS).addValue(worker, 1, 1);
+ }
+
+ public void incSinkRecordPutNums(String taskId) {
+ if (StringUtils.isBlank(taskId)) {
+ return;
+ }
+ this.statsTable.get(SINK_RECORD_PUT_NUMS).addValue(taskId, 1, 1);
+ }
+
+
+
+
+ public void incSinkRecordPutTotalFailRT(final long rt) {
+ this.statsTable.get(SINK_RECORD_PUT_TOTAL_FAIL_RT).addValue(worker, (int) rt, 1);
+ }
+
+ public void incSinkRecordPutFailRT(String taskId, final long rt) {
+ if (StringUtils.isBlank(taskId)) {
+ return;
+ }
+ this.statsTable.get(SINK_RECORD_PUT_FAIL_RT).addValue(taskId, (int) rt, 1);
+ }
+
+ public void incSinkRecordReadTotalFailRT(final long rt) {
+ this.statsTable.get(SINK_RECORD_READ_TOTAL_FAIL_RT).addValue(worker, (int) rt, 1);
+ }
+
+ public void incSinkRecordReadFailRT(String taskId, final long rt) {
+ if (StringUtils.isBlank(taskId)) {
+ return;
+ }
+ this.statsTable.get(SINK_RECORD_READ_FAIL_RT).addValue(taskId, (int) rt, 1);
+ }
+
+ public void incSinkRecordReadTotalRT(final long rt) {
+ this.statsTable.get(SINK_RECORD_READ_TOTAL_RT).addValue(worker, (int) rt, 1);
+ }
+
+ public void incSinkRecordReadRT(String taskId, final long rt) {
+ if (StringUtils.isBlank(taskId)) {
+ return;
+ }
+ this.statsTable.get(SINK_RECORD_READ_RT).addValue(taskId, (int) rt, 1);
+ }
+
+ public void incSinkRecordPutTotalRT(final long rt) {
+ this.statsTable.get(SINK_RECORD_PUT_TOTAL_RT).addValue(worker, (int) rt, 1);
+ }
+
+ public void incSinkRecordPutRT(String taskId, final long rt) {
+ if (StringUtils.isBlank(taskId)) {
+ return;
+ }
+ this.statsTable.get(SINK_RECORD_PUT_RT).addValue(taskId, (int) rt, 1);
+ }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsService.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsService.java
new file mode 100644
index 0000000..f92ab7a
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsService.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.rocketmq.connect.runtime.stats;
+
+import java.text.MessageFormat;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+public class ConnectStatsService extends ServiceThread {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.ROCKETMQ_CONNECT_STATS);
+
+ private static final int FREQUENCY_OF_SAMPLING = 1000;
+
+ private static final int MAX_RECORDS_OF_SAMPLING = 60 * 10;
+
+ private static int printTPSInterval = 60 * 1;
+
+ private final ConcurrentMap<String, AtomicLong> sourceTaskTimesTotal =
+ new ConcurrentHashMap<String, AtomicLong>(128);
+ private final ConcurrentMap<String, AtomicLong> sinkTaskTimesTotal =
+ new ConcurrentHashMap<String, AtomicLong>(128);
+
+ private final LinkedList<CallSnapshot> sourceTaskTimesList = new LinkedList<CallSnapshot>();
+ private final LinkedList<CallSnapshot> sinkTaskTimesList = new LinkedList<CallSnapshot>();
+
+ private long connectBootTimestamp = System.currentTimeMillis();
+ private ReentrantLock lockPut = new ReentrantLock();
+ private ReentrantLock lockGet = new ReentrantLock();
+
+ private volatile long dispatchMaxBuffer = 0;
+
+ private ReentrantLock lockSampling = new ReentrantLock();
+ private long lastPrintTimestamp = System.currentTimeMillis();
+
+ public ConnectStatsService() {
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder(1024);
+ Long totalTimes = sourceTaskTimesTotal();
+ if (0 == totalTimes) {
+ totalTimes = 1L;
+ }
+ Long sinktotalTimes = sinkTaskTimesTotal();
+ if (0 == sinktotalTimes) {
+ sinktotalTimes = 1L;
+ }
+ sb.append("\truntime: " + this.getFormatRuntime() + "\r\n");
+ sb.append("\tsourceTaskTimesTotal: " + totalTimes + "\r\n");
+ sb.append("\tsinTaskTimesTotal: " + sinktotalTimes + "\r\n");
+ sb.append("\tsourceTps: " + this.getSourceTaskTps() + "\r\n");
+ sb.append("\tgetTotalTps: " + this.getTotalTps() + "\r\n");
+ sb.append("\tsinkTps: " + this.getSinkTaskTps() + "\r\n");
+ return sb.toString();
+ }
+
+ public long sourceTaskTimesTotal() {
+ long rs = 0;
+ for (AtomicLong data : sourceTaskTimesTotal.values()) {
+ rs += data.get();
+ }
+ return rs;
+ }
+
+ public long sinkTaskTimesTotal() {
+ long rs = 0;
+ for (AtomicLong data : sinkTaskTimesTotal.values()) {
+ rs += data.get();
+ }
+ return rs;
+ }
+
+ private String getFormatRuntime() {
+ final long millisecond = 1;
+ final long second = 1000 * millisecond;
+ final long minute = 60 * second;
+ final long hour = 60 * minute;
+ final long day = 24 * hour;
+ final MessageFormat messageFormat = new MessageFormat("[ {0} days, {1} hours, {2} minutes, {3} seconds ]");
+
+ long time = System.currentTimeMillis() - this.connectBootTimestamp;
+ long days = time / day;
+ long hours = (time % day) / hour;
+ long minutes = (time % hour) / minute;
+ long seconds = (time % minute) / second;
+ return messageFormat.format(new Long[] {days, hours, minutes, seconds});
+ }
+
+
+ private String getSourceTaskTps() {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append(this.getSourceTaskTps(10));
+ sb.append(" ");
+
+ sb.append(this.getSourceTaskTps(60));
+ sb.append(" ");
+
+ sb.append(this.getSourceTaskTps(600));
+
+ return sb.toString();
+ }
+
+ private String getSinkTaskTps() {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append(this.getSinkTaskTps(10));
+ sb.append(" ");
+
+ sb.append(this.getSinkTaskTps(60));
+ sb.append(" ");
+
+ sb.append(this.getSinkTaskTps(600));
+
+ return sb.toString();
+ }
+
+ private String getTotalTps() {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append(this.getTotalTps(10));
+ sb.append(" ");
+
+ sb.append(this.getTotalTps(60));
+ sb.append(" ");
+
+ sb.append(this.getTotalTps(600));
+
+ return sb.toString();
+ }
+
+ private String getSourceTaskTps(int time) {
+ String result = "";
+ this.lockSampling.lock();
+ try {
+ CallSnapshot last = this.sourceTaskTimesList.getLast();
+
+ if (this.sourceTaskTimesList.size() > time) {
+ CallSnapshot lastBefore = this.sourceTaskTimesList.get(this.sourceTaskTimesList.size() - (time + 1));
+ result += CallSnapshot.getTPS(lastBefore, last);
+ }
+
+ } finally {
+ this.lockSampling.unlock();
+ }
+ return result;
+ }
+ private String getSinkTaskTps(int time) {
+ String result = "";
+ this.lockSampling.lock();
+ try {
+ CallSnapshot last = this.sinkTaskTimesList.getLast();
+
+ if (this.sinkTaskTimesList.size() > time) {
+ CallSnapshot lastBefore = this.sinkTaskTimesList.get(this.sinkTaskTimesList.size() - (time + 1));
+ result += CallSnapshot.getTPS(lastBefore, last);
+ }
+
+ } finally {
+ this.lockSampling.unlock();
+ }
+ return result;
+ }
+
+ private String getTotalTps(int time) {
+ this.lockSampling.lock();
+ double source = 0;
+ double sink = 0;
+ try {
+ {
+ CallSnapshot last = this.sourceTaskTimesList.getLast();
+
+ if (this.sourceTaskTimesList.size() > time) {
+ CallSnapshot lastBefore = this.sourceTaskTimesList.get(this.sourceTaskTimesList.size() - (time + 1));
+ source += CallSnapshot.getTPS(lastBefore, last);
+ }
+ }
+ {
+ CallSnapshot last = this.sinkTaskTimesList.getLast();
+
+ if (this.sinkTaskTimesList.size() > time) {
+ CallSnapshot lastBefore = this.sinkTaskTimesList.get(this.sinkTaskTimesList.size() - (time + 1));
+ sink += CallSnapshot.getTPS(lastBefore, last);
+ }
+ }
+ } finally {
+ this.lockSampling.unlock();
+ }
+ return Double.toString(source + sink);
+ }
+
+ public HashMap<String, String> getRuntimeInfo() {
+ HashMap<String, String> result = new HashMap<String, String>(64);
+
+ Long totalTimes = sourceTaskTimesTotal();
+ if (0 == totalTimes) {
+ totalTimes = 1L;
+ }
+
+ Long sinktotalTimes = sinkTaskTimesTotal();
+ if (0 == sinktotalTimes) {
+ sinktotalTimes = 1L;
+ }
+
+ result.put("bootTimestamp", String.valueOf(this.connectBootTimestamp));
+ result.put("runtime", this.getFormatRuntime());
+ result.put("sourceTaskTimesTotal", String.valueOf(totalTimes));
+ result.put("sinkTaskTimesTotal", String.valueOf(sinktotalTimes));
+ result.put("sourceTaskTps", String.valueOf(this.getSourceTaskTps()));
+ result.put("sinkTaskTps", String.valueOf(this.getSinkTaskTps()));
+ return result;
+ }
+
+ public void run() {
+ log.info(this.getServiceName() + " service started");
+
+ while (!this.isStopped()) {
+ try {
+ this.waitForRunning(FREQUENCY_OF_SAMPLING);
+
+ this.sampling();
+
+ this.printTps();
+ } catch (Exception e) {
+ log.warn(this.getServiceName() + " service has exception. ", e);
+ }
+ }
+
+ log.info(this.getServiceName() + " service end");
+ }
+
+ @Override
+ public String getServiceName() {
+ return ConnectStatsService.class.getSimpleName();
+ }
+
+ private void sampling() {
+ this.lockSampling.lock();
+ try {
+ this.sourceTaskTimesList.add(new CallSnapshot(System.currentTimeMillis(), sourceTaskTimesTotal()));
+ if (this.sourceTaskTimesList.size() > (MAX_RECORDS_OF_SAMPLING + 1)) {
+ this.sourceTaskTimesList.removeFirst();
+ }
+
+ this.sinkTaskTimesList.add(new CallSnapshot(System.currentTimeMillis(), sinkTaskTimesTotal()));
+ if (this.sourceTaskTimesList.size() > (MAX_RECORDS_OF_SAMPLING + 1)) {
+ this.sourceTaskTimesList.removeFirst();
+ }
+ } finally {
+ this.lockSampling.unlock();
+ }
+ }
+
+ private void printTps() {
+ if (System.currentTimeMillis() > (this.lastPrintTimestamp + printTPSInterval * 1000)) {
+ this.lastPrintTimestamp = System.currentTimeMillis();
+
+ log.info("[CONNECTPS] source_task_tps {} sink_task_tps {}",
+ this.getSourceTaskTps(printTPSInterval),
+ this.getSinkTaskTps(printTPSInterval)
+ );
+ }
+ }
+
+ public AtomicLong singleSourceTaskTimesTotal(String taskId) {
+ if (StringUtils.isBlank(taskId)) {
+ return null;
+ }
+ AtomicLong rs = sourceTaskTimesTotal.get(taskId);
+ if (null == rs) {
+ rs = new AtomicLong(0);
+ AtomicLong previous = sourceTaskTimesTotal.putIfAbsent(taskId, rs);
+ if (previous != null) {
+ rs = previous;
+ }
+ }
+ return rs;
+ }
+
+ public AtomicLong singleSinkTaskTimesTotal(String taskId) {
+ if (StringUtils.isBlank(taskId)) {
+ return null;
+ }
+ AtomicLong rs = sinkTaskTimesTotal.get(taskId);
+ if (null == rs) {
+ rs = new AtomicLong(0);
+ AtomicLong previous = sinkTaskTimesTotal.putIfAbsent(taskId, rs);
+ if (previous != null) {
+ rs = previous;
+ }
+ }
+ return rs;
+ }
+
+ public Map<String, AtomicLong> getSourceTaskTimesTotal() {
+ return sourceTaskTimesTotal;
+ }
+
+ public Map<String, AtomicLong> getSinkTaskTimesTotal() {
+ return sinkTaskTimesTotal;
+ }
+
+
+ static class CallSnapshot {
+ public final long timestamp;
+ public final long callTimesTotal;
+
+ public CallSnapshot(long timestamp, long callTimesTotal) {
+ this.timestamp = timestamp;
+ this.callTimesTotal = callTimesTotal;
+ }
+
+ public static double getTPS(final CallSnapshot begin, final CallSnapshot end) {
+ long total = end.callTimesTotal - begin.callTimesTotal;
+ Long time = end.timestamp - begin.timestamp;
+
+ double tps = total / time.doubleValue();
+
+ return tps * 1000;
+ }
+ }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
index ca49600..7013110 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
@@ -26,6 +26,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
@@ -36,6 +38,7 @@ import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
import org.apache.rocketmq.connect.runtime.service.strategy.AllocateConnAndTaskStrategy;
@@ -49,6 +52,10 @@ import static org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSinkTas
public class ConnectUtil {
+ private final static AtomicLong GROUP_POSTFIX_ID = new AtomicLong(0);
+
+ private static final String SYS_TASK_CG_PREFIX = "connect-";
+
public static String createGroupName(String prefix) {
StringBuilder sb = new StringBuilder();
sb.append(prefix).append("-");
@@ -208,4 +215,24 @@ public class ConnectUtil {
return recordPartition;
}
+
+ public static DefaultMQPullConsumer initDefaultMQPullConsumer(ConnectConfig connectConfig, String connectorName, ConnectKeyValue keyValue) {
+ RPCHook rpcHook = null;
+ if (connectConfig.getAclEnable()) {
+ rpcHook = new AclClientRPCHook(new SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey()));
+ }
+ DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(rpcHook);
+ consumer.setInstanceName(createInstance(connectorName));
+ String taskGroupId = keyValue.getString("task-group-id");
+ if (StringUtils.isNotBlank(taskGroupId)) {
+ consumer.setConsumerGroup(taskGroupId);
+ } else {
+ consumer.setConsumerGroup(SYS_TASK_CG_PREFIX + connectorName);
+ }
+ if (StringUtils.isNotBlank(connectConfig.getNamesrvAddr())) {
+ consumer.setNamesrvAddr(connectConfig.getNamesrvAddr());
+ }
+ return consumer;
+ }
+
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/PluginUtils.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/PluginUtils.java
index c8f81ba..b2bf34c 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/PluginUtils.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/PluginUtils.java
@@ -115,6 +115,7 @@ public class PluginUtils {
+ "|org\\.w3c\\.dom"
+ "|org\\.xml\\.sax"
+ "|io\\.openmessaging\\.connector\\.api"
+ + "|org\\.slf4j"
+ ")\\..*$"
+ "|io\\.openmessaging\\.KeyValue");
diff --git a/rocketmq-connect-runtime/src/main/resources/logback.xml b/rocketmq-connect-runtime/src/main/resources/logback.xml
index 6d9bc75..ace099a 100644
--- a/rocketmq-connect-runtime/src/main/resources/logback.xml
+++ b/rocketmq-connect-runtime/src/main/resources/logback.xml
@@ -78,6 +78,24 @@
</encoder>
</appender>
+ <appender name="RocketmqConnectStatsAppender"
+ class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <file>${user.home}/logs/rocketmqconnect/connect_stats.log</file>
+ <append>true</append>
+ <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+ <fileNamePattern>${user.home}/logs/rocketmqconnect/otherdays/connect_stats.%i.log.gz</fileNamePattern>
+ <minIndex>1</minIndex>
+ <maxIndex>20</maxIndex>
+ </rollingPolicy>
+ <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+ <maxFileSize>128MB</maxFileSize>
+ </triggeringPolicy>
+ <encoder>
+ <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>
+ <charset class="java.nio.charset.Charset">UTF-8</charset>
+ </encoder>
+ </appender>
+
<appender name="Console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
index b9d08aa..a9c51ed 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
@@ -38,7 +38,10 @@ import org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestConnect
import org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestConverter;
import org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestPositionManageServiceImpl;
import org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestSourceTask;
+import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
import org.apache.rocketmq.connect.runtime.utils.Plugin;
import org.apache.rocketmq.connect.runtime.utils.TestUtils;
import org.junit.After;
@@ -60,6 +63,9 @@ public class WorkerTest {
private PositionManagementService offsetManagementService;
@Mock
+ private ConfigManagementService configManagementService;
+
+ @Mock
private DefaultMQProducer producer;
private ConnectConfig connectConfig;
@@ -75,13 +81,19 @@ public class WorkerTest {
@Mock
private ConnectController connectController;
+ @Mock
+ private ConnectStatsManager connectStatsManager;
+
+ @Mock
+ private ConnectStatsService connectStatsService;
+
@Before
public void init() {
connectConfig = new ConnectConfig();
connectConfig.setHttpPort(8081);
connectConfig.setStorePathRootDir(System.getProperty("user.home") + File.separator + "testConnectorStore");
connectConfig.setNamesrvAddr("localhost:9876");
- worker = new Worker(connectConfig, positionManagementService, offsetManagementService, plugin);
+ worker = new Worker(connectConfig, positionManagementService, configManagementService, plugin, connectController);
Set<WorkerConnector> workingConnectors = new HashSet<>();
for (int i = 0; i < 3; i++) {
@@ -105,6 +117,7 @@ public class WorkerTest {
new TestConverter(),
producer,
new AtomicReference(WorkerState.STARTED),
+ connectStatsManager, connectStatsService,
transformChain));
}
worker.setWorkingTasks(runnables);
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
index 02dcd04..c3f5e56 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
@@ -53,6 +53,8 @@ import org.apache.rocketmq.connect.runtime.service.ClusterManagementService;
import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
import org.apache.rocketmq.connect.runtime.service.DefaultConnectorContext;
import org.apache.rocketmq.connect.runtime.service.PositionManagementServiceImpl;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
import org.apache.rocketmq.connect.runtime.utils.Plugin;
import org.junit.Before;
import org.junit.Test;
@@ -132,6 +134,12 @@ public class RestHandlerTest {
private AtomicReference<WorkerState> workerState;
+ @Mock
+ private ConnectStatsManager connectStatsManager;
+
+ @Mock
+ private ConnectStatsService connectStatsService;
+
@Before
public void init() throws Exception {
workerState = new AtomicReference<>(WorkerState.STARTED);
@@ -198,8 +206,8 @@ public class RestHandlerTest {
}
};
TransformChain<ConnectRecord> transformChain = new TransformChain<ConnectRecord>(new DefaultKeyValue(), new Plugin(new ArrayList<>()));
- WorkerSourceTask workerSourceTask1 = new WorkerSourceTask("testConnectorName1", sourceTask, connectKeyValue, positionManagementServiceImpl, converter, producer, workerState, transformChain);
- WorkerSourceTask workerSourceTask2 = new WorkerSourceTask("testConnectorName2", sourceTask, connectKeyValue1, positionManagementServiceImpl, converter, producer, workerState, transformChain);
+ WorkerSourceTask workerSourceTask1 = new WorkerSourceTask("testConnectorName1", sourceTask, connectKeyValue, positionManagementServiceImpl, converter, producer, workerState, connectStatsManager, connectStatsService, transformChain);
+ WorkerSourceTask workerSourceTask2 = new WorkerSourceTask("testConnectorName2", sourceTask, connectKeyValue1, positionManagementServiceImpl, converter, producer, workerState, connectStatsManager, connectStatsService, transformChain);
workerTasks = new HashSet<Runnable>() {
{
add(workerSourceTask1);
diff --git a/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSinkTask.java b/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSinkTask.java
index e7a8f92..08a8964 100644
--- a/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSinkTask.java
+++ b/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSinkTask.java
@@ -17,16 +17,12 @@
package org.apache.rocketmq.connect.file;
-import com.alibaba.fastjson.JSON;
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.task.sink.SinkTask;
import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
import io.openmessaging.connector.api.data.ConnectRecord;
-import io.openmessaging.connector.api.data.Field;
-import io.openmessaging.connector.api.data.FieldType;
import io.openmessaging.connector.api.data.RecordOffset;
import io.openmessaging.connector.api.data.RecordPartition;
-import io.openmessaging.connector.api.data.Schema;
import io.openmessaging.connector.api.errors.ConnectException;
import java.io.IOException;
import java.io.PrintStream;
@@ -53,19 +49,7 @@ public class FileSinkTask extends SinkTask {
for (ConnectRecord record : sinkDataEntries) {
Object payload = record.getData();
log.trace("Writing line to {}: {}", logFilename(), payload);
- Schema schema = record.getSchema();
- if (null == schema || null == schema.getFieldType()) {
- log.warn("error record {}", JSON.toJSONString(record));
- continue;
- }
- List<Field> fields = schema.getFields();
- for (Field field : fields) {
- FieldType type = schema.getFieldType();
- if (type.equals(FieldType.STRING)) {
- log.info("Writing line to {}: {}", logFilename(), payload);
- outputStream.println(payload);
- }
- }
+ outputStream.println(payload);
}
}
diff --git a/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSourceTask.java b/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSourceTask.java
index 74b71b7..b534fd7 100644
--- a/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSourceTask.java
+++ b/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSourceTask.java
@@ -85,7 +85,7 @@ public class FileSourceTask extends SourceTask {
}
log.debug("Skipped to offset {}", lastRecordedOffset);
}
- streamOffset = (lastRecordedOffset != null) ? (Long) lastRecordedOffset : 0L;
+ streamOffset = (lastRecordedOffset != null) ? Long.valueOf(String.valueOf(lastRecordedOffset)) : 0L;
} else {
log.info("positionInfo is null!");
streamOffset = 0L;
@@ -203,6 +203,7 @@ public class FileSourceTask extends SourceTask {
@Override public void start(SourceTaskContext sourceTaskContext) {
+ this.sourceTaskContext = sourceTaskContext;
fileConfig = new FileConfig();
fileConfig.load(config);
log.info("fileName is:{}", fileConfig.getFilename());
diff --git a/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FilterTransform.java b/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FilterTransform.java
new file mode 100644
index 0000000..31f57a6
--- /dev/null
+++ b/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FilterTransform.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.file;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.ComponentContext;
+import io.openmessaging.connector.api.component.Transform;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FilterTransform implements Transform<ConnectRecord> {
+
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.FILE_CONNECTOR);
+
+ private KeyValue keyValue;
+
+ @Override public ConnectRecord doTransform(ConnectRecord record) {
+ Object data = record.getData();
+ String s = String.valueOf(data);
+ s = s + ":filter";
+ record.setData(s);
+ return record;
+ }
+
+ @Override public void validate(KeyValue config) {
+
+ }
+
+ @Override public void init(KeyValue config) {
+ this.keyValue = config;
+ log.info("transform config {}", this.keyValue);
+ }
+
+ @Override public void start(ComponentContext componentContext) {
+
+ }
+
+ @Override public void stop() {
+
+ }
+}