You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/07/28 05:48:04 UTC
[rocketmq-connect] branch master updated: [ISSUE #183] Optimize worker task (#201)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 26ef4e5 [ISSUE #183] Optimize worker task (#201)
26ef4e5 is described below
commit 26ef4e52c3178c2e06e6ac0f4ea16afb5aa5db7b
Author: xiaoyi <su...@163.com>
AuthorDate: Thu Jul 28 13:47:59 2022 +0800
[ISSUE #183] Optimize worker task (#201)
* Optimize WorkerSourceTask and WorkerSinkTask #183
* Optimize WorkerSourceTask and WorkerSinkTask #183
* Optimize worker #183
* fixed
* fixed
* format code
* fixed
* fixed WorkerTask #183
* Fix debezium demecial type conversion problem #190
* Upgrade rocketmq-replicator API to v0.1.3 #189
* Encountered change event for table databasename.tablename whose schema isn`t known to this connector #191
* Debezium mysql source connector delete event causes null pointer #196
* remove local config
* Debezium mysql source connector delete event causes null pointer #196
* Ensure the orderly submission of offset #183
* fixed
* Optimize WorkerDirectTask #183
* fixed #183
* test offset commit #183
* fixed #183
* fixed task id #183
* fixed
* upgrade fastjson
* reformat code #183
* Whether the same task is assigned to different worker nodes with different connectorTaskId [fixed]
* fixed
* Rocketmq replicator running null pointer #205
---
connectors/rocketmq-connect-jdbc/pom.xml | 7 +-
pom.xml | 4 +-
.../connect/runtime/config/ConnectConfig.java | 121 +++-
.../runtime/config/SinkConnectorConfig.java | 2 -
.../connectorwrapper/RecordOffsetManagement.java | 285 +++++++++
.../SourceTaskOffsetCommitter.java | 113 ++++
.../connect/runtime/connectorwrapper/Worker.java | 540 +++++++++--------
.../runtime/connectorwrapper/WorkerDirectTask.java | 410 +++++++------
.../runtime/connectorwrapper/WorkerSinkTask.java | 500 +++++++--------
.../connectorwrapper/WorkerSinkTaskContext.java | 2 +-
.../runtime/connectorwrapper/WorkerSourceTask.java | 673 +++++++++++++--------
.../connectorwrapper/WorkerSourceTaskContext.java | 60 ++
.../runtime/connectorwrapper/WorkerTask.java | 184 +++++-
.../connect/runtime/errors/ErrorReporter.java | 6 +
.../runtime/errors/RetryWithToleranceOperator.java | 18 +
.../rocketmq/connect/runtime/rest/RestHandler.java | 2 +-
.../service/AbstractConfigManagementService.java | 80 +++
.../service/ConfigManagementServiceImpl.java | 43 +-
.../service/PositionManagementServiceImpl.java | 1 -
.../memory/MemoryConfigManagementServiceImpl.java | 49 +-
.../runtime/store/PositionStorageWriter.java | 156 ++++-
.../connect/runtime/utils/ConnectUtil.java | 6 +-
.../connect/runtime/utils/ConnectorTaskId.java | 83 +++
.../connect/runtime/utils/CurrentTaskState.java | 79 +++
.../runtime/connectorwrapper/WorkerTest.java | 7 +-
.../connect/runtime/rest/RestHandlerTest.java | 5 +-
26 files changed, 2326 insertions(+), 1110 deletions(-)
diff --git a/connectors/rocketmq-connect-jdbc/pom.xml b/connectors/rocketmq-connect-jdbc/pom.xml
index c4aed33..7617043 100644
--- a/connectors/rocketmq-connect-jdbc/pom.xml
+++ b/connectors/rocketmq-connect-jdbc/pom.xml
@@ -40,7 +40,7 @@
<!-- Compiler settings properties -->
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
- <rocketmq.version>4.5.2</rocketmq.version>
+ <rocketmq.version>4.7.0</rocketmq.version>
<!--test jar-->
<junit.version>4.13.1</junit.version>
@@ -285,11 +285,6 @@
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
- <archive>
- <manifest>
- <mainClass>org.apache.rocketmq.connect.redis.connector.RedisSourceConnector</mainClass>
- </manifest>
- </archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
diff --git a/pom.xml b/pom.xml
index cb8b222..64b719a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -38,12 +38,12 @@
</license>
</licenses>
<properties>
- <rocketmq.version>4.4.0</rocketmq.version>
+ <rocketmq.version>4.7.0</rocketmq.version>
<junit.version>4.13.1</junit.version>
<assertj.version>2.6.0</assertj.version>
<mockito.version>3.2.4</mockito.version>
<httpclient.version>4.5.13</httpclient.version>
- <openmessaging.connector.version>0.1.3</openmessaging.connector.version>
+ <openmessaging.connector.version>0.1.4-SNAPSHOT</openmessaging.connector.version>
<fastjson.version>1.2.83</fastjson.version>
<javalin.version>2.8.0</javalin.version>
<slf4j.version>1.7.7</slf4j.version>
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectConfig.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectConfig.java
index 45bf3ac..8710cbb 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectConfig.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectConfig.java
@@ -17,11 +17,12 @@
package org.apache.rocketmq.connect.runtime.config;
-import java.io.File;
import org.apache.rocketmq.common.MixAll;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+
import static org.apache.rocketmq.connect.runtime.common.LoggerName.ROCKETMQ_RUNTIME;
/**
@@ -69,6 +70,16 @@ public class ConnectConfig {
private int rmqMinConsumeThreadNums = 1;
+ // task start timeout mills, default 3 minute
+ private long maxStartTimeoutMills = 1000 * 60 * 3;
+
+ // task stop timeout mills, default 1 minute
+ private long maxStopTimeoutMills = 1000 * 60;
+
+ // offset commit timeout
+ private long offsetCommitTimeoutMsConfig = 1000 * 30;
+
+
public int getBrokerSuspendMaxTimeMillis() {
return brokerSuspendMaxTimeMillis;
}
@@ -137,6 +148,12 @@ public class ConnectConfig {
private String adminExtGroup = "connector-admin-group";
+ /**
+ * offset commit interval ms
+ */
+ private long offsetCommitIntervalMs = 5000L;
+
+
public String getWorkerId() {
return workerId;
}
@@ -369,38 +386,76 @@ public class ConnectConfig {
this.connectHome = connectHome;
}
- @Override public String toString() {
+
+ public long getOffsetCommitIntervalMs() {
+ return offsetCommitIntervalMs;
+ }
+
+ public void setOffsetCommitIntervalMs(long offsetCommitIntervalMs) {
+ this.offsetCommitIntervalMs = offsetCommitIntervalMs;
+ }
+
+ public long getMaxStartTimeoutMills() {
+ return maxStartTimeoutMills;
+ }
+
+ public void setMaxStartTimeoutMills(long maxStartTimeoutMills) {
+ this.maxStartTimeoutMills = maxStartTimeoutMills;
+ }
+
+ public long getMaxStopTimeoutMills() {
+ return maxStopTimeoutMills;
+ }
+
+ public void setMaxStopTimeoutMills(long maxStopTimeoutMills) {
+ this.maxStopTimeoutMills = maxStopTimeoutMills;
+ }
+
+ public long getOffsetCommitTimeoutMsConfig() {
+ return offsetCommitTimeoutMsConfig;
+ }
+
+ public void setOffsetCommitTimeoutMsConfig(long offsetCommitTimeoutMsConfig) {
+ this.offsetCommitTimeoutMsConfig = offsetCommitTimeoutMsConfig;
+ }
+
+ @Override
+ public String toString() {
return "ConnectConfig{" +
- "workerId='" + workerId + '\'' +
- ", storePathRootDir='" + storePathRootDir + '\'' +
- ", connectHome='" + connectHome + '\'' +
- ", namesrvAddr='" + namesrvAddr + '\'' +
- ", rmqProducerGroup='" + rmqProducerGroup + '\'' +
- ", maxMessageSize=" + maxMessageSize +
- ", operationTimeout=" + operationTimeout +
- ", rmqConsumerGroup='" + rmqConsumerGroup + '\'' +
- ", rmqMaxRedeliveryTimes=" + rmqMaxRedeliveryTimes +
- ", rmqMessageConsumeTimeout=" + rmqMessageConsumeTimeout +
- ", rmqMaxConsumeThreadNums=" + rmqMaxConsumeThreadNums +
- ", rmqMinConsumeThreadNums=" + rmqMinConsumeThreadNums +
- ", brokerSuspendMaxTimeMillis=" + brokerSuspendMaxTimeMillis +
- ", clusterStoreTopic='" + clusterStoreTopic + '\'' +
- ", configStoreTopic='" + configStoreTopic + '\'' +
- ", positionStoreTopic='" + positionStoreTopic + '\'' +
- ", offsetStoreTopic='" + offsetStoreTopic + '\'' +
- ", httpPort=" + httpPort +
- ", positionPersistInterval=" + positionPersistInterval +
- ", offsetPersistInterval=" + offsetPersistInterval +
- ", configPersistInterval=" + configPersistInterval +
- ", pluginPaths='" + pluginPaths + '\'' +
- ", connectClusterId='" + connectClusterId + '\'' +
- ", allocTaskStrategy='" + allocTaskStrategy + '\'' +
- ", aclEnable=" + aclEnable +
- ", accessKey='" + accessKey + '\'' +
- ", secretKey='" + secretKey + '\'' +
- ", autoCreateGroupEnable=" + autoCreateGroupEnable +
- ", clusterName='" + clusterName + '\'' +
- ", adminExtGroup='" + adminExtGroup + '\'' +
- '}';
+ "workerId='" + workerId + '\'' +
+ ", storePathRootDir='" + storePathRootDir + '\'' +
+ ", connectHome='" + connectHome + '\'' +
+ ", namesrvAddr='" + namesrvAddr + '\'' +
+ ", rmqProducerGroup='" + rmqProducerGroup + '\'' +
+ ", maxMessageSize=" + maxMessageSize +
+ ", operationTimeout=" + operationTimeout +
+ ", rmqConsumerGroup='" + rmqConsumerGroup + '\'' +
+ ", rmqMaxRedeliveryTimes=" + rmqMaxRedeliveryTimes +
+ ", rmqMessageConsumeTimeout=" + rmqMessageConsumeTimeout +
+ ", rmqMaxConsumeThreadNums=" + rmqMaxConsumeThreadNums +
+ ", rmqMinConsumeThreadNums=" + rmqMinConsumeThreadNums +
+ ", maxStartTimeoutMills=" + maxStartTimeoutMills +
+ ", maxStopTimeoutMills=" + maxStopTimeoutMills +
+ ", offsetCommitTimeoutMsConfig=" + offsetCommitTimeoutMsConfig +
+ ", brokerSuspendMaxTimeMillis=" + brokerSuspendMaxTimeMillis +
+ ", clusterStoreTopic='" + clusterStoreTopic + '\'' +
+ ", configStoreTopic='" + configStoreTopic + '\'' +
+ ", positionStoreTopic='" + positionStoreTopic + '\'' +
+ ", offsetStoreTopic='" + offsetStoreTopic + '\'' +
+ ", httpPort=" + httpPort +
+ ", positionPersistInterval=" + positionPersistInterval +
+ ", offsetPersistInterval=" + offsetPersistInterval +
+ ", configPersistInterval=" + configPersistInterval +
+ ", pluginPaths='" + pluginPaths + '\'' +
+ ", connectClusterId='" + connectClusterId + '\'' +
+ ", allocTaskStrategy='" + allocTaskStrategy + '\'' +
+ ", aclEnable=" + aclEnable +
+ ", accessKey='" + accessKey + '\'' +
+ ", secretKey='" + secretKey + '\'' +
+ ", autoCreateGroupEnable=" + autoCreateGroupEnable +
+ ", clusterName='" + clusterName + '\'' +
+ ", adminExtGroup='" + adminExtGroup + '\'' +
+ ", offsetCommitIntervalMs=" + offsetCommitIntervalMs +
+ '}';
}
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SinkConnectorConfig.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SinkConnectorConfig.java
index f533093..b513078 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SinkConnectorConfig.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SinkConnectorConfig.java
@@ -29,8 +29,6 @@ import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
public class SinkConnectorConfig extends ConnectConfig {
-
-
public static Set<String> parseTopicList(ConnectKeyValue taskConfig) {
String messageQueueStr = taskConfig.getString(RuntimeConfigDefine.CONNECT_TOPICNAME);
if (StringUtils.isBlank(messageQueueStr)) {
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/RecordOffsetManagement.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/RecordOffsetManagement.java
new file mode 100644
index 0000000..c17ade1
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/RecordOffsetManagement.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.connectorwrapper;
+
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.RecordPosition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * record offset management
+ */
+class RecordOffsetManagement {
+
+ private static final Logger log = LoggerFactory.getLogger(RecordOffsetManagement.class);
+
+ final Map<RecordPartition, Deque<SubmittedPosition>> records = new HashMap<>();
+ private AtomicInteger numUnackedMessages = new AtomicInteger(0);
+ private CountDownLatch messageDrainLatch;
+
+ public RecordOffsetManagement() {
+ }
+
+ /**
+ * submit record
+ *
+ * @param position
+ * @return
+ */
+ public SubmittedPosition submitRecord(RecordPosition position) {
+ SubmittedPosition submittedPosition = new SubmittedPosition(position);
+ records.computeIfAbsent(position.getPartition(), e -> new LinkedList<>()).add(submittedPosition);
+ // ensure thread safety in operation
+ synchronized (this) {
+ numUnackedMessages.incrementAndGet();
+ }
+ return submittedPosition;
+ }
+
+ /**
+ * await all messages
+ *
+ * @param timeout
+ * @param timeUnit
+ * @return
+ */
+ public boolean awaitAllMessages(long timeout, TimeUnit timeUnit) {
+ // Create a new message drain latch as a local variable to avoid SpotBugs warnings about inconsistent synchronization
+ // on an instance variable when invoking CountDownLatch::await outside a synchronized block
+ CountDownLatch messageDrainLatch;
+ synchronized (this) {
+ messageDrainLatch = new CountDownLatch(numUnackedMessages.get());
+ this.messageDrainLatch = messageDrainLatch;
+ }
+ try {
+ return messageDrainLatch.await(timeout, timeUnit);
+ } catch (InterruptedException e) {
+ return false;
+ }
+ }
+
+ /**
+ * @param submittedPositions
+ * @return
+ */
+ private RecordOffset pollOffsetWhile(Deque<SubmittedPosition> submittedPositions) {
+ RecordOffset offset = null;
+ // Stop pulling if there is an uncommitted breakpoint
+ while (canCommitHead(submittedPositions)) {
+ offset = submittedPositions.poll().getPosition().getOffset();
+ }
+ return offset;
+ }
+
+ private boolean canCommitHead(Deque<SubmittedPosition> submittedPositions) {
+ return submittedPositions.peek() != null && submittedPositions.peek().getAcked();
+ }
+
+ public CommittableOffsets committableOffsets() {
+ Map<RecordPartition, RecordOffset> offsets = new HashMap<>();
+ int totalCommittableMessages = 0;
+ int totalUncommittableMessages = 0;
+ int largestDequeSize = 0;
+ RecordPartition largestDequePartition = null;
+ for (Map.Entry<RecordPartition, Deque<SubmittedPosition>> entry : records.entrySet()) {
+ RecordPartition partition = entry.getKey();
+ Deque<SubmittedPosition> queuedRecords = entry.getValue();
+ int initialDequeSize = queuedRecords.size();
+ if (canCommitHead(queuedRecords)) {
+ RecordOffset offset = pollOffsetWhile(queuedRecords);
+ offsets.put(partition, offset);
+ }
+ // uncommited messages
+ int uncommittableMessages = queuedRecords.size();
+ // committed messages
+ int committableMessages = initialDequeSize - uncommittableMessages;
+
+ // calc total
+ totalCommittableMessages += committableMessages;
+ totalUncommittableMessages += uncommittableMessages;
+
+ if (uncommittableMessages > largestDequeSize) {
+ largestDequeSize = uncommittableMessages;
+ largestDequePartition = partition;
+ }
+ }
+ // Clear out all empty deques from the map to keep it from growing indefinitely
+ records.values().removeIf(Deque::isEmpty);
+ return new CommittableOffsets(offsets, totalCommittableMessages, totalUncommittableMessages, records.size(), largestDequeSize, largestDequePartition);
+ }
+
+
+ // Synchronize in order to ensure that the number of unacknowledged messages isn't modified in the middle of a call
+ // to awaitAllMessages (which might cause us to decrement first, then create a new message drain latch, then count down
+ // that latch here, effectively double-acking the message)
+ private synchronized void messageAcked() {
+ numUnackedMessages.decrementAndGet();
+ if (messageDrainLatch != null) {
+ messageDrainLatch.countDown();
+ }
+ }
+
+ public class SubmittedPosition {
+ private final RecordPosition position;
+ private final AtomicBoolean acked;
+
+ public SubmittedPosition(RecordPosition position) {
+ this.position = position;
+ acked = new AtomicBoolean(false);
+ }
+
+
+ /**
+ * Acknowledge this record; signals that its offset may be safely committed.
+ */
+ public void ack() {
+ if (this.acked.compareAndSet(false, true)) {
+ messageAcked();
+ }
+ }
+
+ /**
+ * remove record
+ *
+ * @return
+ */
+ public boolean remove() {
+ Deque<SubmittedPosition> deque = records.get(position.getPartition());
+ if (deque == null) {
+ return false;
+ }
+ boolean result = deque.removeLastOccurrence(this);
+ if (deque.isEmpty()) {
+ records.remove(position.getPartition());
+ }
+ if (result) {
+ messageAcked();
+ } else {
+ log.warn("Attempted to remove record from submitted queue for partition {}, but the record has not been submitted or has already been removed", position.getPartition());
+ }
+ return result;
+ }
+
+ public RecordPosition getPosition() {
+ return position;
+ }
+
+ public Boolean getAcked() {
+ return acked.get();
+ }
+ }
+
+
+ /**
+ * Contains a snapshot of offsets that can be committed for a source task and metadata for that offset commit
+ * (such as the number of messages for which offsets can and cannot be committed).
+ */
+ static class CommittableOffsets {
+
+ /**
+ * An "empty" snapshot that contains no offsets to commit and whose metadata contains no committable or uncommitable messages.
+ */
+ public static final CommittableOffsets EMPTY = new CommittableOffsets(Collections.emptyMap(), 0, 0, 0, 0, null);
+
+ private final Map<RecordPartition, RecordOffset> offsets;
+ private final RecordPartition largestDequePartition;
+ private final int numCommittableMessages;
+ private final int numUncommittableMessages;
+ private final int numDeques;
+ private final int largestDequeSize;
+
+
+ CommittableOffsets(
+ Map<RecordPartition, RecordOffset> offsets,
+ int numCommittableMessages,
+ int numUncommittableMessages,
+ int numDeques,
+ int largestDequeSize,
+ RecordPartition largestDequePartition
+ ) {
+ this.offsets = offsets != null ? new HashMap<>(offsets) : Collections.emptyMap();
+ this.numCommittableMessages = numCommittableMessages;
+ this.numUncommittableMessages = numUncommittableMessages;
+ this.numDeques = numDeques;
+ this.largestDequeSize = largestDequeSize;
+ this.largestDequePartition = largestDequePartition;
+ }
+
+ public Map<RecordPartition, RecordOffset> offsets() {
+ return Collections.unmodifiableMap(offsets);
+ }
+
+ public int numCommittableMessages() {
+ return numCommittableMessages;
+ }
+
+ public int numUncommittableMessages() {
+ return numUncommittableMessages;
+ }
+
+ public int numDeques() {
+ return numDeques;
+ }
+
+ public int largestDequeSize() {
+ return largestDequeSize;
+ }
+
+ public RecordPartition largestDequePartition() {
+ return largestDequePartition;
+ }
+
+ public boolean hasPending() {
+ return numUncommittableMessages > 0;
+ }
+
+
+ public boolean isEmpty() {
+ return numCommittableMessages == 0 && numUncommittableMessages == 0 && offsets.isEmpty();
+ }
+
+
+ public CommittableOffsets updatedWith(CommittableOffsets newerOffsets) {
+ Map<RecordPartition, RecordOffset> offsets = new HashMap<>(this.offsets);
+ offsets.putAll(newerOffsets.offsets);
+
+ return new CommittableOffsets(
+ offsets,
+ this.numCommittableMessages + newerOffsets.numCommittableMessages,
+ newerOffsets.numUncommittableMessages,
+ newerOffsets.numDeques,
+ newerOffsets.largestDequeSize,
+ newerOffsets.largestDequePartition
+ );
+ }
+ }
+
+
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/SourceTaskOffsetCommitter.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/SourceTaskOffsetCommitter.java
new file mode 100644
index 0000000..9f1491c
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/SourceTaskOffsetCommitter.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.connectorwrapper;
+
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * source task offset committer
+ */
+class SourceTaskOffsetCommitter {
+ private static final Logger log = LoggerFactory.getLogger(SourceTaskOffsetCommitter.class);
+
+ private final ConnectConfig config;
+ private final ScheduledExecutorService commitExecutorService;
+ private final ConcurrentMap<ConnectorTaskId, ScheduledFuture<?>> committers;
+
+
+ SourceTaskOffsetCommitter(ConnectConfig config,
+ ScheduledExecutorService commitExecutorService,
+ ConcurrentMap<ConnectorTaskId, ScheduledFuture<?>> committers) {
+ this.config = config;
+ this.commitExecutorService = commitExecutorService;
+ this.committers = committers;
+ }
+
+ public SourceTaskOffsetCommitter(ConnectConfig config) {
+ this(config, Executors.newSingleThreadScheduledExecutor(ThreadUtils.newThreadFactory(
+ SourceTaskOffsetCommitter.class.getSimpleName() + "-%d", false)),
+ new ConcurrentHashMap<>());
+ }
+
+ public void close(long timeoutMs) {
+ commitExecutorService.shutdown();
+ try {
+ if (!commitExecutorService.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) {
+ log.error("Graceful shutdown of offset commitOffsets thread timed out.");
+ }
+ } catch (InterruptedException e) {
+ // ignore and allow to exit immediately
+ }
+ }
+
+ public void schedule(final ConnectorTaskId id, final WorkerSourceTask workerTask) {
+ long commitIntervalMs = config.getOffsetCommitIntervalMs();
+ ScheduledFuture<?> commitFuture = commitExecutorService.scheduleWithFixedDelay(() -> {
+ commit(workerTask);
+ }, commitIntervalMs, commitIntervalMs, TimeUnit.MILLISECONDS);
+ committers.put(id, commitFuture);
+ }
+
+ public void remove(ConnectorTaskId id) {
+ final ScheduledFuture<?> task = committers.remove(id);
+ if (task == null) {
+ return;
+ }
+
+ try {
+ task.cancel(false);
+ if (!task.isDone()) {
+ task.get();
+ }
+ } catch (CancellationException e) {
+ // ignore
+ log.trace("Offset commit thread was cancelled by another thread while removing connector task with id: {}", id);
+ } catch (ExecutionException | InterruptedException e) {
+ throw new ConnectException("Unexpected interruption in SourceTaskOffsetCommitter while removing task with id: " + id, e);
+ }
+ }
+
+ private void commit(WorkerSourceTask workerTask) {
+ log.debug("{} Committing offsets", workerTask);
+ try {
+ if (workerTask.commitOffsets()) {
+ return;
+ }
+ log.error("{} Failed to commit offsets", workerTask);
+ } catch (Throwable t) {
+ // We're very careful about exceptions here since any uncaught exceptions in the commit
+ // thread would cause the fixed interval schedule on the ExecutorService to stop running
+ // for that task
+ log.error("{} Unhandled exception when committing: ", workerTask, t);
+ }
+ }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
index 70bb7ad..dec3e88 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
@@ -25,47 +25,49 @@ import io.openmessaging.connector.api.component.task.Task;
import io.openmessaging.connector.api.component.task.sink.SinkTask;
import io.openmessaging.connector.api.component.task.source.SourceTask;
import io.openmessaging.connector.api.data.ConnectRecord;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
-
import io.openmessaging.connector.api.data.RecordConverter;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.connect.runtime.controller.AbstractConnectController;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
+import org.apache.rocketmq.connect.runtime.controller.AbstractConnectController;
import org.apache.rocketmq.connect.runtime.errors.ReporterManagerUtil;
import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
import org.apache.rocketmq.connect.runtime.service.DefaultConnectorContext;
import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
-import org.apache.rocketmq.connect.runtime.service.TaskPositionCommitService;
import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
+import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
import org.apache.rocketmq.connect.runtime.utils.Plugin;
import org.apache.rocketmq.connect.runtime.utils.PluginClassLoader;
import org.apache.rocketmq.connect.runtime.utils.ServiceThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
/**
* A worker to schedule all connectors and tasks in a process.
*/
@@ -78,80 +80,60 @@ public class Worker {
private Set<WorkerConnector> workingConnectors = new ConcurrentSet<>();
/**
- * Current running tasks.
+ * Current tasks state.
*/
private Map<Runnable, Long/*timestamp*/> pendingTasks = new ConcurrentHashMap<>();
-
private Set<Runnable> runningTasks = new ConcurrentSet<>();
-
- private Set<Runnable> errorTasks = new ConcurrentSet<>();
-
- private Set<Runnable> cleanedErrorTasks = new ConcurrentSet<>();
-
private Map<Runnable, Long/*timestamp*/> stoppingTasks = new ConcurrentHashMap<>();
-
private Set<Runnable> stoppedTasks = new ConcurrentSet<>();
-
private Set<Runnable> cleanedStoppedTasks = new ConcurrentSet<>();
+ private Set<Runnable> errorTasks = new ConcurrentSet<>();
+ private Set<Runnable> cleanedErrorTasks = new ConcurrentSet<>();
Map<String, List<ConnectKeyValue>> latestTaskConfigs = new HashMap<>();
/**
* Current running tasks to its Future map.
*/
private Map<Runnable, Future> taskToFutureMap = new ConcurrentHashMap<>();
-
/**
* Thread pool for connectors and tasks.
*/
private final ExecutorService taskExecutor;
-
/**
* Position management for source tasks.
*/
private final PositionManagementService positionManagementService;
-
/**
* A scheduled task to commit source position of source tasks.
*/
- private final TaskPositionCommitService taskPositionCommitService;
-
- private final ConnectConfig connectConfig;
-
+// private final TaskPositionCommitService taskPositionCommitService;
+ private Optional<SourceTaskOffsetCommitter> sourceTaskOffsetCommitter;
+ private final ConnectConfig workerConfig;
private final Plugin plugin;
- private static final int MAX_START_TIMEOUT_MILLS = 1000 * 60;
-
- private static final long MAX_STOP_TIMEOUT_MILLS = 20000;
-
/**
* Atomic state variable
*/
private AtomicReference<WorkerState> workerState;
-
private StateMachineService stateMachineService = new StateMachineService();
-
private final ConnectStatsManager connectStatsManager;
-
private final ConnectStatsService connectStatsService;
- public Worker(ConnectConfig connectConfig,
+ public Worker(ConnectConfig workerConfig,
PositionManagementService positionManagementService,
ConfigManagementService configManagementService,
Plugin plugin, AbstractConnectController connectController) {
- this.connectConfig = connectConfig;
+ this.workerConfig = workerConfig;
this.taskExecutor = Executors.newCachedThreadPool(new DefaultThreadFactory("task-Worker-Executor-"));
this.positionManagementService = positionManagementService;
- this.taskPositionCommitService = new TaskPositionCommitService(
- this,
- positionManagementService);
this.plugin = plugin;
this.connectStatsManager = connectController.getConnectStatsManager();
this.connectStatsService = connectController.getConnectStatsService();
+ this.sourceTaskOffsetCommitter = Optional.of(new SourceTaskOffsetCommitter(workerConfig));
}
public void start() {
workerState = new AtomicReference<>(WorkerState.STARTED);
- taskPositionCommitService.start();
stateMachineService.start();
}
@@ -164,7 +146,7 @@ public class Worker {
* @throws Exception
*/
public synchronized void startConnectors(Map<String, ConnectKeyValue> connectorConfigs,
- AbstractConnectController connectController) throws Exception {
+ AbstractConnectController connectController) throws Exception {
Set<WorkerConnector> stoppedConnector = new HashSet<>();
for (WorkerConnector workerConnector : workingConnectors) {
try {
@@ -247,11 +229,11 @@ public class Worker {
for (Runnable runnable : set) {
ConnectKeyValue taskConfig = null;
if (runnable instanceof WorkerSourceTask) {
- taskConfig = ((WorkerSourceTask) runnable).getTaskConfig();
+ taskConfig = ((WorkerSourceTask) runnable).currentTaskConfig();
} else if (runnable instanceof WorkerSinkTask) {
- taskConfig = ((WorkerSinkTask) runnable).getTaskConfig();
+ taskConfig = ((WorkerSinkTask) runnable).currentTaskConfig();
} else if (runnable instanceof WorkerDirectTask) {
- taskConfig = ((WorkerDirectTask) runnable).getTaskConfig();
+ taskConfig = ((WorkerDirectTask) runnable).currentTaskConfig();
}
if (keyValue.equals(taskConfig)) {
return true;
@@ -267,19 +249,22 @@ public class Worker {
public void stop() {
workerState.set(WorkerState.TERMINATED);
try {
+ sourceTaskOffsetCommitter.ifPresent(committer -> committer.close(5000));
taskExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("Task termination error.", e);
}
stateMachineService.shutdown();
+
}
+
public Set<WorkerConnector> getWorkingConnectors() {
return workingConnectors;
}
public void setWorkingConnectors(
- Set<WorkerConnector> workingConnectors) {
+ Set<WorkerConnector> workingConnectors) {
this.workingConnectors = workingConnectors;
}
@@ -326,83 +311,246 @@ public class Worker {
}
+ /**
+ * maintain task state
+ *
+ * @throws Exception
+ */
public void maintainTaskState() throws Exception {
-
- Map<String, List<ConnectKeyValue>> taskConfigs = new HashMap<>();
+ Map<String, List<ConnectKeyValue>> connectorConfig = new HashMap<>();
synchronized (latestTaskConfigs) {
- taskConfigs.putAll(latestTaskConfigs);
+ connectorConfig.putAll(latestTaskConfigs);
}
- boolean needCommitPosition = false;
+ // STEP 1: check running tasks and put to error status
+ checkRunningTasks(connectorConfig);
+
+ // get new Tasks
+ Map<String, List<ConnectKeyValue>> newTasks = newTasks(connectorConfig);
+
+ // STEP 2: try to create new tasks
+ startTask(newTasks);
+
+ // STEP 3: check all pending state
+ checkPendingTask();
+
+ // STEP 4 check stopping tasks
+ checkStoppingTasks();
+
+ // STEP 5 check error tasks
+ checkErrorTasks();
+
+ // STEP 6 check errorTasks and stopped tasks
+ checkStoppedTasks();
+ }
+
+ /**
+ * check running task
+ *
+ * @param connectorConfig
+ */
+ private void checkRunningTasks(Map<String, List<ConnectKeyValue>> connectorConfig) {
// STEP 1: check running tasks and put to error status
for (Runnable runnable : runningTasks) {
WorkerTask workerTask = (WorkerTask) runnable;
- String connectorName = workerTask.getConnectorName();
- ConnectKeyValue taskConfig = workerTask.getTaskConfig();
- List<ConnectKeyValue> keyValues = taskConfigs.get(connectorName);
+ String connectorName = workerTask.id().connector();
+ ConnectKeyValue taskConfig = workerTask.currentTaskConfig();
+ List<ConnectKeyValue> taskConfigs = connectorConfig.get(connectorName);
WorkerTaskState state = ((WorkerTask) runnable).getState();
-
- if (WorkerTaskState.ERROR == state) {
- errorTasks.add(runnable);
- runningTasks.remove(runnable);
- } else if (WorkerTaskState.RUNNING == state) {
- boolean needStop = true;
- if (null != keyValues && keyValues.size() > 0) {
- for (ConnectKeyValue keyValue : keyValues) {
- if (keyValue.equals(taskConfig)) {
- needStop = false;
- break;
+ switch (state) {
+ case ERROR:
+ errorTasks.add(runnable);
+ runningTasks.remove(runnable);
+ break;
+ case RUNNING:
+ if (isNeedStop(taskConfig, taskConfigs)) {
+ try {
+ // remove committer offset
+ sourceTaskOffsetCommitter.ifPresent(commiter -> commiter.remove(workerTask.id()));
+ workerTask.doClose();
+ } catch (Exception e) {
+ log.error("workerTask stop exception, workerTask: " + workerTask.currentTaskConfig(), e);
}
+ log.info("Task stopping, connector name {}, config {}", workerTask.id().connector(), workerTask.currentTaskConfig());
+ runningTasks.remove(runnable);
+ stoppingTasks.put(runnable, System.currentTimeMillis());
}
+ break;
+ default:
+ log.error("[BUG] Illegal State in when checking running tasks, {} is in {} state",
+ ((WorkerTask) runnable).id().connector(), state);
+ break;
+ }
+ }
+ }
+
+ /**
+ * check is need stop
+ *
+ * @param taskConfig
+ * @param keyValues
+ * @return
+ */
+ private boolean isNeedStop(ConnectKeyValue taskConfig, List<ConnectKeyValue> keyValues) {
+ if (CollectionUtils.isEmpty(keyValues)) {
+ return true;
+ }
+ for (ConnectKeyValue keyValue : keyValues) {
+ if (keyValue.equals(taskConfig)) {
+ // not stop
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * check stopped tasks
+ */
+ private void checkStoppedTasks() {
+ for (Runnable runnable : stoppedTasks) {
+ WorkerTask workerTask = (WorkerTask) runnable;
+ workerTask.cleanup();
+ Future future = taskToFutureMap.get(runnable);
+ try {
+ if (null != future) {
+ future.get(workerConfig.getMaxStartTimeoutMills(), TimeUnit.MILLISECONDS);
+ } else {
+ log.error("[BUG] stopped Tasks reference not found in taskFutureMap");
}
+ } catch (ExecutionException e) {
+ Throwable t = e.getCause();
+ log.info("[BUG] Stopped Tasks should not throw any exception");
+ t.printStackTrace();
+ } catch (CancellationException e) {
+ log.info("[BUG] Stopped Tasks throws PrintStackTrace");
+ e.printStackTrace();
+ } catch (TimeoutException e) {
+ log.info("[BUG] Stopped Tasks should not throw any exception");
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ log.info("[BUG] Stopped Tasks should not throw any exception");
+ e.printStackTrace();
+ } finally {
+ // remove committer offset
+ sourceTaskOffsetCommitter.ifPresent(commiter -> commiter.remove(workerTask.id()));
+ future.cancel(true);
+ taskToFutureMap.remove(runnable);
+ stoppedTasks.remove(runnable);
+ cleanedStoppedTasks.add(runnable);
+ }
+ }
+ }
- if (needStop) {
- try {
- workerTask.stop();
- } catch (Exception e) {
- log.error("workerTask stop exception, workerTask: " + workerTask.getTaskConfig(), e);
- }
- log.info("Task stopping, connector name {}, config {}", workerTask.getConnectorName(), workerTask.getTaskConfig());
- runningTasks.remove(runnable);
- stoppingTasks.put(runnable, System.currentTimeMillis());
- needCommitPosition = true;
+ private void checkErrorTasks() {
+ for (Runnable runnable : errorTasks) {
+ WorkerTask workerTask = (WorkerTask) runnable;
+ Future future = taskToFutureMap.get(runnable);
+ try {
+ if (null != future) {
+ future.get(workerConfig.getMaxStopTimeoutMills(), TimeUnit.MILLISECONDS);
+ } else {
+ log.error("[BUG] errorTasks reference not found in taskFutureMap");
}
- } else {
- log.error("[BUG] Illegal State in when checking running tasks, {} is in {} state",
- ((WorkerTask) runnable).getConnectorName(), state.toString());
+ } catch (ExecutionException e) {
+ log.error("Execution exception , {}", e);
+ } catch (CancellationException | TimeoutException | InterruptedException e) {
+ log.error("error, {}", e);
+ } finally {
+ // remove committer offset
+ sourceTaskOffsetCommitter.ifPresent(commiter -> commiter.remove(workerTask.id()));
+
+ future.cancel(true);
+ workerTask.cleanup();
+ taskToFutureMap.remove(runnable);
+ errorTasks.remove(runnable);
+ cleanedErrorTasks.add(runnable);
}
}
+ }
- //If some tasks are closed, synchronize the position.
- if (needCommitPosition) {
- taskPositionCommitService.commitTaskPosition();
+ private void checkStoppingTasks() {
+ for (Map.Entry<Runnable, Long> entry : stoppingTasks.entrySet()) {
+ Runnable runnable = entry.getKey();
+ Long stopTimestamp = entry.getValue();
+ Long currentTimeMillis = System.currentTimeMillis();
+ Future future = taskToFutureMap.get(runnable);
+ WorkerTaskState state = ((WorkerTask) runnable).getState();
+ // exited normally
+ switch (state) {
+ case STOPPED:
+ // concurrent modification Exception ? Will it pop that in the
+ if (null == future || !future.isDone()) {
+ log.error("[BUG] future is null or Stopped task should have its Future.isDone() true, but false");
+ }
+ stoppingTasks.remove(runnable);
+ stoppedTasks.add(runnable);
+ break;
+ case ERROR:
+ stoppingTasks.remove(runnable);
+ errorTasks.add(runnable);
+ break;
+ case STOPPING:
+ if (currentTimeMillis - stopTimestamp > workerConfig.getMaxStopTimeoutMills()) {
+ ((WorkerTask) runnable).timeout();
+ stoppingTasks.remove(runnable);
+ errorTasks.add(runnable);
+ }
+ break;
+ default:
+ log.error("[BUG] Illegal State in when checking stopping tasks, {} is in {} state",
+ ((WorkerTask) runnable).id().connector(), state.toString());
+ }
}
+ }
- // get new Tasks
- Map<String, List<ConnectKeyValue>> newTasks = new HashMap<>();
- for (String connectorName : taskConfigs.keySet()) {
- for (ConnectKeyValue keyValue : taskConfigs.get(connectorName)) {
- boolean isNewTask = true;
- if (isConfigInSet(keyValue, runningTasks) || isConfigInSet(keyValue, pendingTasks.keySet()) || isConfigInSet(keyValue, errorTasks)) {
- isNewTask = false;
- }
- if (isNewTask) {
- if (!newTasks.containsKey(connectorName)) {
- newTasks.put(connectorName, new ArrayList<>());
+ private void checkPendingTask() {
+ for (Map.Entry<Runnable, Long> entry : pendingTasks.entrySet()) {
+ Runnable runnable = entry.getKey();
+ Long startTimestamp = entry.getValue();
+ Long currentTimeMillis = System.currentTimeMillis();
+ WorkerTaskState state = ((WorkerTask) runnable).getState();
+ switch (state) {
+ case ERROR:
+ errorTasks.add(runnable);
+ pendingTasks.remove(runnable);
+ break;
+ case RUNNING:
+ runningTasks.add(runnable);
+ pendingTasks.remove(runnable);
+ break;
+ case NEW:
+ log.info("[RACE CONDITION] we checked the pending tasks before state turns to PENDING");
+ break;
+ case PENDING:
+ if (currentTimeMillis - startTimestamp > workerConfig.getMaxStartTimeoutMills()) {
+ ((WorkerTask) runnable).timeout();
+ pendingTasks.remove(runnable);
+ errorTasks.add(runnable);
}
- log.info("Add new tasks,connector name {}, config {}", connectorName, keyValue);
- newTasks.get(connectorName).add(keyValue);
- }
+ break;
+ default:
+ log.error("[BUG] Illegal State in when checking pending tasks, {} is in {} state",
+ ((WorkerTask) runnable).id().connector(), state.toString());
+ break;
}
}
+ }
- // STEP 2: try to create new tasks
- int taskId = 0;
+ /**
+ * start task
+ *
+ * @param newTasks
+ * @throws Exception
+ */
+ private void startTask(Map<String, List<ConnectKeyValue>> newTasks) throws Exception {
for (String connectorName : newTasks.keySet()) {
for (ConnectKeyValue keyValue : newTasks.get(connectorName)) {
+ int taskId = keyValue.getInt(RuntimeConfigDefine.TASK_ID);
+ ConnectorTaskId id = new ConnectorTaskId(connectorName, taskId);
String taskType = keyValue.getString(RuntimeConfigDefine.TASK_TYPE);
if (TaskType.DIRECT.name().equalsIgnoreCase(taskType)) {
- createDirectTask(connectorName, keyValue);
+ createDirectTask(id, keyValue);
continue;
}
try {
@@ -422,7 +570,7 @@ public class Worker {
RecordConverter recordConverter = null;
if (StringUtils.isNotEmpty(converterClazzName)) {
recordConverter = Class.forName(converterClazzName)
- .asSubclass(io.openmessaging.connector.api.data.RecordConverter.class)
+ .asSubclass(RecordConverter.class)
.getDeclaredConstructor()
.newInstance();
recordConverter.configure(keyValue.getProperties());
@@ -432,36 +580,36 @@ public class Worker {
Plugin.compareAndSwapLoaders(loader);
}
if (task instanceof SourceTask) {
-
- DefaultMQProducer producer = ConnectUtil.initDefaultMQProducer(connectConfig);
+ DefaultMQProducer producer = ConnectUtil.initDefaultMQProducer(workerConfig);
TransformChain<ConnectRecord> transformChain = new TransformChain<>(keyValue, plugin);
// create retry operator
RetryWithToleranceOperator retryWithToleranceOperator = ReporterManagerUtil.createRetryWithToleranceOperator(keyValue);
retryWithToleranceOperator.reporters(ReporterManagerUtil.sourceTaskReporters(connectorName, keyValue));
- WorkerSourceTask workerSourceTask = new WorkerSourceTask(connectorName,
- (SourceTask) task, keyValue, positionManagementService, recordConverter, producer, workerState, connectStatsManager, connectStatsService, transformChain, retryWithToleranceOperator);
+ WorkerSourceTask workerSourceTask = new WorkerSourceTask(workerConfig, id,
+ (SourceTask) task, loader, keyValue, positionManagementService, recordConverter, producer, workerState, connectStatsManager, connectStatsService, transformChain, retryWithToleranceOperator);
Plugin.compareAndSwapLoaders(currentThreadLoader);
-
Future future = taskExecutor.submit(workerSourceTask);
+ // schedule offset committer
+ sourceTaskOffsetCommitter.ifPresent(committer -> committer.schedule(id, workerSourceTask));
+
taskToFutureMap.put(workerSourceTask, future);
this.pendingTasks.put(workerSourceTask, System.currentTimeMillis());
+
} else if (task instanceof SinkTask) {
log.info("sink task config keyValue is {}", keyValue.getProperties());
- DefaultMQPullConsumer consumer = ConnectUtil.initDefaultMQPullConsumer(connectConfig, connectorName, keyValue, ++taskId);
- Set<String> consumerGroupSet = ConnectUtil.fetchAllConsumerGroupList(connectConfig);
+ DefaultMQPullConsumer consumer = ConnectUtil.initDefaultMQPullConsumer(workerConfig, id, keyValue);
+ Set<String> consumerGroupSet = ConnectUtil.fetchAllConsumerGroupList(workerConfig);
if (!consumerGroupSet.contains(consumer.getConsumerGroup())) {
- ConnectUtil.createSubGroup(connectConfig, consumer.getConsumerGroup());
+ ConnectUtil.createSubGroup(workerConfig, consumer.getConsumerGroup());
}
TransformChain<ConnectRecord> transformChain = new TransformChain<>(keyValue, plugin);
-
// create retry operator
RetryWithToleranceOperator retryWithToleranceOperator = ReporterManagerUtil.createRetryWithToleranceOperator(keyValue);
- retryWithToleranceOperator.reporters(ReporterManagerUtil.sinkTaskReporters(connectorName, keyValue, connectConfig));
+ retryWithToleranceOperator.reporters(ReporterManagerUtil.sinkTaskReporters(connectorName, keyValue, workerConfig));
-
- WorkerSinkTask workerSinkTask = new WorkerSinkTask(connectorName,
- (SinkTask) task, keyValue, recordConverter, consumer, workerState, connectStatsManager, connectStatsService, transformChain,
+ WorkerSinkTask workerSinkTask = new WorkerSinkTask(workerConfig, id,
+ (SinkTask) task, loader, keyValue, recordConverter, consumer, workerState, connectStatsManager, connectStatsService, transformChain,
retryWithToleranceOperator, ReporterManagerUtil.createWorkerErrorRecordReporter(keyValue, retryWithToleranceOperator, recordConverter));
Plugin.compareAndSwapLoaders(currentThreadLoader);
Future future = taskExecutor.submit(workerSinkTask);
@@ -473,137 +621,59 @@ public class Worker {
}
}
}
+ }
- // STEP 3: check all pending state
- for (Map.Entry<Runnable, Long> entry : pendingTasks.entrySet()) {
- Runnable runnable = entry.getKey();
- Long startTimestamp = entry.getValue();
- Long currentTimeMillis = System.currentTimeMillis();
- WorkerTaskState state = ((WorkerTask) runnable).getState();
-
- if (WorkerTaskState.ERROR == state) {
- errorTasks.add(runnable);
- pendingTasks.remove(runnable);
- } else if (WorkerTaskState.RUNNING == state) {
- runningTasks.add(runnable);
- pendingTasks.remove(runnable);
- } else if (WorkerTaskState.NEW == state) {
- log.info("[RACE CONDITION] we checked the pending tasks before state turns to PENDING");
- } else if (WorkerTaskState.PENDING == state) {
- if (currentTimeMillis - startTimestamp > MAX_START_TIMEOUT_MILLS) {
- ((WorkerTask) runnable).timeout();
- pendingTasks.remove(runnable);
- errorTasks.add(runnable);
- }
- } else {
- log.error("[BUG] Illegal State in when checking pending tasks, {} is in {} state",
- ((WorkerTask) runnable).getConnectorName(), state.toString());
- }
- }
-
- // STEP 4 check stopping tasks
- for (Map.Entry<Runnable, Long> entry : stoppingTasks.entrySet()) {
- Runnable runnable = entry.getKey();
- Long stopTimestamp = entry.getValue();
- Long currentTimeMillis = System.currentTimeMillis();
- Future future = taskToFutureMap.get(runnable);
- WorkerTaskState state = ((WorkerTask) runnable).getState();
- // exited normally
-
- if (WorkerTaskState.STOPPED == state) {
- // concurrent modification Exception ? Will it pop that in the
-
- if (null == future || !future.isDone()) {
- log.error("[BUG] future is null or Stopped task should have its Future.isDone() true, but false");
- }
- stoppingTasks.remove(runnable);
- stoppedTasks.add(runnable);
- } else if (WorkerTaskState.ERROR == state) {
- stoppingTasks.remove(runnable);
- errorTasks.add(runnable);
- } else if (WorkerTaskState.STOPPING == state) {
- if (currentTimeMillis - stopTimestamp > MAX_STOP_TIMEOUT_MILLS) {
- ((WorkerTask) runnable).timeout();
- stoppingTasks.remove(runnable);
- errorTasks.add(runnable);
- }
- } else {
-
- log.error("[BUG] Illegal State in when checking stopping tasks, {} is in {} state",
- ((WorkerTask) runnable).getConnectorName(), state.toString());
- }
- }
-
- // STEP 5 check errorTasks and stopped tasks
- for (Runnable runnable : errorTasks) {
- WorkerTask workerTask = (WorkerTask) runnable;
- Future future = taskToFutureMap.get(runnable);
-
- try {
- if (null != future) {
- future.get(1000 * 60, TimeUnit.MILLISECONDS);
- } else {
- log.error("[BUG] errorTasks reference not found in taskFutureMap");
+ private Map<String, List<ConnectKeyValue>> newTasks(Map<String, List<ConnectKeyValue>> taskConfigs) {
+ Map<String, List<ConnectKeyValue>> newTasks = new HashMap<>();
+ for (String connectorName : taskConfigs.keySet()) {
+ for (ConnectKeyValue keyValue : taskConfigs.get(connectorName)) {
+ boolean isNewTask = true;
+ if (isConfigInSet(keyValue, runningTasks) || isConfigInSet(keyValue, pendingTasks.keySet()) || isConfigInSet(keyValue, errorTasks)) {
+ isNewTask = false;
}
- } catch (ExecutionException e) {
- Throwable t = e.getCause();
- log.error("Execution exception , {}", e);
- } catch (CancellationException | TimeoutException | InterruptedException e) {
- log.error("error, {}", e);
- } finally {
- future.cancel(true);
- workerTask.cleanup();
- taskToFutureMap.remove(runnable);
- errorTasks.remove(runnable);
- cleanedErrorTasks.add(runnable);
-
- }
- }
-
- // STEP 5 check errorTasks and stopped tasks
- for (Runnable runnable : stoppedTasks) {
- WorkerTask workerTask = (WorkerTask) runnable;
- workerTask.cleanup();
- Future future = taskToFutureMap.get(runnable);
- try {
- if (null != future) {
- future.get(1000, TimeUnit.MILLISECONDS);
- } else {
- log.error("[BUG] stopped Tasks reference not found in taskFutureMap");
+ if (isNewTask) {
+ if (!newTasks.containsKey(connectorName)) {
+ newTasks.put(connectorName, new ArrayList<>());
+ }
+ log.info("Add new tasks,connector name {}, config {}", connectorName, keyValue);
+ newTasks.get(connectorName).add(keyValue);
}
- } catch (ExecutionException e) {
- Throwable t = e.getCause();
- log.info("[BUG] Stopped Tasks should not throw any exception");
- t.printStackTrace();
- } catch (CancellationException e) {
- log.info("[BUG] Stopped Tasks throws PrintStackTrace");
- e.printStackTrace();
- } catch (TimeoutException e) {
- log.info("[BUG] Stopped Tasks should not throw any exception");
- e.printStackTrace();
- } catch (InterruptedException e) {
- log.info("[BUG] Stopped Tasks should not throw any exception");
- e.printStackTrace();
- } finally {
- future.cancel(true);
- taskToFutureMap.remove(runnable);
- stoppedTasks.remove(runnable);
- cleanedStoppedTasks.add(runnable);
}
}
+ return newTasks;
}
- private void createDirectTask(String connectorName, ConnectKeyValue keyValue) throws Exception {
+ private void createDirectTask(ConnectorTaskId id, ConnectKeyValue keyValue) throws Exception {
String sourceTaskClass = keyValue.getString(RuntimeConfigDefine.SOURCE_TASK_CLASS);
Task sourceTask = getTask(sourceTaskClass);
String sinkTaskClass = keyValue.getString(RuntimeConfigDefine.SINK_TASK_CLASS);
Task sinkTask = getTask(sinkTaskClass);
- WorkerDirectTask workerDirectTask = new WorkerDirectTask(connectorName,
- (SourceTask) sourceTask, (SinkTask) sinkTask, keyValue, positionManagementService, workerState);
+ TransformChain<ConnectRecord> transformChain = new TransformChain<>(keyValue, plugin);
+ // create retry operator
+ RetryWithToleranceOperator retryWithToleranceOperator = ReporterManagerUtil.createRetryWithToleranceOperator(keyValue);
+ retryWithToleranceOperator.reporters(ReporterManagerUtil.sourceTaskReporters(id.connector(), keyValue));
+
+ WorkerDirectTask workerDirectTask = new WorkerDirectTask(
+ workerConfig,
+ id,
+ (SourceTask) sourceTask,
+ null,
+ (SinkTask) sinkTask,
+ keyValue,
+ positionManagementService,
+ workerState,
+ connectStatsManager,
+ connectStatsService,
+ transformChain,
+ retryWithToleranceOperator);
Future future = taskExecutor.submit(workerDirectTask);
+
+ // schedule offset committer
+ sourceTaskOffsetCommitter.ifPresent(committer -> committer.schedule(id, workerDirectTask));
+
taskToFutureMap.put(workerDirectTask, future);
this.pendingTasks.put(workerDirectTask, System.currentTimeMillis());
}
@@ -632,7 +702,6 @@ public class Worker {
@Override
public void run() {
log.info(this.getServiceName() + " service started");
-
while (!this.isStopped()) {
this.waitForRunning(1000);
try {
@@ -642,7 +711,6 @@ public class Worker {
log.error("RebalanceImpl#StateMachineService start connector or task failed", e);
}
}
-
log.info(this.getServiceName() + " service end");
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
index 3a56d72..ce56766 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
@@ -27,35 +27,32 @@ import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.data.RecordOffset;
import io.openmessaging.connector.api.data.RecordPartition;
import io.openmessaging.connector.api.storage.OffsetStorageReader;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.commons.collections.MapUtils;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
-import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
import org.apache.rocketmq.connect.runtime.store.PositionStorageReaderImpl;
-import org.apache.rocketmq.connect.runtime.store.PositionStorageWriter;
+import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
/**
* A wrapper of {@link SinkTask} and {@link SourceTask} for runtime.
*/
-public class WorkerDirectTask implements WorkerTask {
+public class WorkerDirectTask extends WorkerSourceTask {
private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
- /**
- * Connector name of current task.
- */
- private String connectorName;
-
/**
* The implements of the source task.
*/
@@ -65,142 +62,72 @@ public class WorkerDirectTask implements WorkerTask {
* The implements of the sink task.
*/
private SinkTask sinkTask;
-
- /**
- * The configs of current sink task.
- */
- private ConnectKeyValue taskConfig;
-
- /**
- * Atomic state variable
- */
- private AtomicReference<WorkerTaskState> state;
-
- private final PositionManagementService positionManagementService;
-
private final OffsetStorageReader positionStorageReader;
- private final PositionStorageWriter positionStorageWriter;
-
- private final AtomicReference<WorkerState> workerState;
-
- public WorkerDirectTask(String connectorName,
- SourceTask sourceTask,
- SinkTask sinkTask,
- ConnectKeyValue taskConfig,
- PositionManagementService positionManagementService,
- AtomicReference<WorkerState> workerState) {
- this.connectorName = connectorName;
+ public WorkerDirectTask(ConnectConfig workerConfig,
+ ConnectorTaskId id,
+ SourceTask sourceTask,
+ ClassLoader classLoader,
+ SinkTask sinkTask,
+ ConnectKeyValue taskConfig,
+ PositionManagementService positionManagementService,
+ AtomicReference<WorkerState> workerState,
+ ConnectStatsManager connectStatsManager,
+ ConnectStatsService connectStatsService,
+ TransformChain<ConnectRecord> transformChain,
+ RetryWithToleranceOperator retryWithToleranceOperator) {
+ super(workerConfig,
+ id,
+ sourceTask,
+ classLoader,
+ taskConfig,
+ positionManagementService,
+ null,
+ null,
+ workerState,
+ connectStatsManager,
+ connectStatsService,
+ transformChain,
+ retryWithToleranceOperator
+ );
this.sourceTask = sourceTask;
this.sinkTask = sinkTask;
- this.taskConfig = taskConfig;
- this.positionManagementService = positionManagementService;
- this.positionStorageReader = new PositionStorageReaderImpl(connectorName, positionManagementService);
- this.positionStorageWriter = new PositionStorageWriter(connectorName, positionManagementService);
- this.state = new AtomicReference<>(WorkerTaskState.NEW);
- this.workerState = workerState;
- }
-
- /**
- * Start a source task, and send data entry to MQ cyclically.
- */
- @Override
- public void run() {
- try {
- starkSinkTask();
- startSourceTask();
- log.info("Direct task start, config:{}", JSON.toJSONString(taskConfig));
- while (WorkerState.STARTED == workerState.get() && WorkerTaskState.RUNNING == state.get()) {
- try {
- Collection<ConnectRecord> toSendEntries = sourceTask.poll();
- if (null != toSendEntries && toSendEntries.size() > 0) {
- sendRecord(toSendEntries);
- }
- } catch (Exception e) {
- log.error("Direct task runtime exception", e);
- state.set(WorkerTaskState.ERROR);
- }
- }
- stopSourceTask();
- stopSinkTask();
- state.compareAndSet(WorkerTaskState.STOPPING, WorkerTaskState.STOPPED);
- log.info("Direct task stop, config:{}", JSON.toJSONString(taskConfig));
- } catch (Exception e) {
- log.error("Run task failed.", e);
- state.set(WorkerTaskState.ERROR);
- }
+ this.positionStorageReader = new PositionStorageReaderImpl(id.connector(), positionManagementService);
}
private void sendRecord(Collection<ConnectRecord> sourceDataEntries) {
- List<ConnectRecord> sinkDataEntries = new ArrayList<>(sourceDataEntries.size());
- Map<RecordPartition, RecordOffset> map = new HashMap<>();
- for (ConnectRecord sourceDataEntry : sourceDataEntries) {
- sinkDataEntries.add(sourceDataEntry);
- RecordPartition recordPartition = sourceDataEntry.getPosition().getPartition();
- RecordOffset recordOffset = sourceDataEntry.getPosition().getOffset();
- if (null != recordPartition && null != recordOffset) {
- map.put(recordPartition, recordOffset);
+ List<ConnectRecord> records = new ArrayList<>(sourceDataEntries.size());
+ List<RecordOffsetManagement.SubmittedPosition> positions = new ArrayList<>();
+ for (ConnectRecord preTransformRecord : sourceDataEntries) {
+
+ retryWithToleranceOperator.sourceRecord(preTransformRecord);
+ ConnectRecord record = transformChain.doTransforms(preTransformRecord);
+ if (record == null) {
+ continue;
}
+
+ records.add(record);
+ /**prepare to send record*/
+ positions.add(prepareToSendRecord(preTransformRecord).get());
+
}
try {
- sinkTask.put(sinkDataEntries);
- try {
- if (!MapUtils.isEmpty(map)) {
- map.forEach(positionStorageWriter::putPosition);
- }
- } catch (Exception e) {
- log.error("Source task save position info failed.", e);
- }
+ sinkTask.put(records);
+ // ack
+ positions.forEach(submittedPosition -> {
+ submittedPosition.ack();
+ });
} catch (Exception e) {
+ // drop commit
+ positions.forEach(submittedPosition -> {
+ submittedPosition.remove();
+ });
log.error("Send message error, error info: {}.", e);
}
}
private void starkSinkTask() {
- sinkTask.init(new SinkTaskContext() {
-
- @Override
- public String getConnectorName() {
- return taskConfig.getString(RuntimeConfigDefine.CONNECTOR_ID);
- }
-
- @Override
- public String getTaskName() {
- return taskConfig.getString(RuntimeConfigDefine.TASK_ID);
- }
-
- /**
- * Get the configurations of current task.
- *
- * @return the configuration of current task.
- */
- @Override
- public KeyValue configs() {
- return taskConfig;
- }
- @Override
- public void resetOffset(RecordPartition recordPartition, RecordOffset recordOffset) {
-
- }
- @Override
- public void resetOffset(Map<RecordPartition, RecordOffset> offsets) {
-
- }
-
- @Override
- public void pause(List<RecordPartition> partitions) {
-
- }
-
- @Override
- public void resume(List<RecordPartition> partitions) {
-
- }
- @Override
- public Set<RecordPartition> assignment() {
- return null;
- }
- });
+ sinkTask.init(new DirectSinkTaskContext());
sinkTask.start(taskConfig);
log.info("Sink task start, config:{}", JSON.toJSONString(taskConfig));
}
@@ -211,32 +138,8 @@ public class WorkerDirectTask implements WorkerTask {
}
private void startSourceTask() {
- state.compareAndSet(WorkerTaskState.NEW, WorkerTaskState.PENDING);
- sourceTask.init(new SourceTaskContext() {
- @Override public OffsetStorageReader offsetStorageReader() {
- return positionStorageReader;
- }
-
- @Override public String getConnectorName() {
- return taskConfig.getString(RuntimeConfigDefine.CONNECTOR_ID);
- }
-
- @Override public String getTaskName() {
- return taskConfig.getString(RuntimeConfigDefine.TASK_ID);
- }
-
- /**
- * Get the configurations of current task.
- *
- * @return the configuration of current task.
- */
- @Override
- public KeyValue configs() {
- return taskConfig;
- }
- });
+ sourceTask.init(new DirectSourceTaskContext());
sourceTask.start(taskConfig);
- state.compareAndSet(WorkerTaskState.PENDING, WorkerTaskState.RUNNING);
log.info("Source task start, config:{}", JSON.toJSONString(taskConfig));
}
@@ -245,46 +148,175 @@ public class WorkerDirectTask implements WorkerTask {
log.info("Source task stop, config:{}", JSON.toJSONString(taskConfig));
}
+ /**
+ * initinalize and start
+ */
@Override
- public WorkerTaskState getState() {
- return this.state.get();
- }
-
- @Override
- public void stop() {
- state.compareAndSet(WorkerTaskState.RUNNING, WorkerTaskState.STOPPING);
+ protected void initializeAndStart() {
+ starkSinkTask();
+ startSourceTask();
+ log.info("Direct task start, config:{}", JSON.toJSONString(taskConfig));
}
+ /**
+ * execute poll and send record
+ */
@Override
- public void cleanup() {
- if (state.compareAndSet(WorkerTaskState.STOPPED, WorkerTaskState.TERMINATED) ||
- state.compareAndSet(WorkerTaskState.ERROR, WorkerTaskState.TERMINATED)) {
- } else {
- log.error("[BUG] cleaning a task but it's not in STOPPED or ERROR state");
+ protected void execute() {
+ while (isRunning()) {
+ updateCommittableOffsets();
+ try {
+ Collection<ConnectRecord> toSendEntries = sourceTask.poll();
+ if (toSendEntries.isEmpty()) {
+ sendRecord(toSendEntries);
+ }
+ } catch (Exception e) {
+ log.error("Direct task runtime exception", e);
+ finalOffsetCommit(true);
+ onFailure(e);
+ }
}
}
+ /**
+ * close resources
+ */
@Override
- public String getConnectorName() {
- return connectorName;
+ public void close() {
+ stopSourceTask();
+ stopSinkTask();
}
- @Override
- public ConnectKeyValue getTaskConfig() {
- return taskConfig;
- }
- @Override
- public Object getJsonObject() {
- HashMap obj = new HashMap<String, Object>();
- obj.put("connectorName", connectorName);
- obj.put("configs", JSON.toJSONString(taskConfig));
- obj.put("state", state.get().toString());
- return obj;
+ /**
+ * direct sink task context
+ */
+ private class DirectSinkTaskContext implements SinkTaskContext {
+
+ /**
+ * Get the Connector Name
+ *
+ * @return connector name
+ */
+ @Override
+ public String getConnectorName() {
+ return id().connector();
+ }
+
+ /**
+ * Get the Task Name of connector.
+ *
+ * @return task name
+ */
+ @Override
+ public String getTaskName() {
+ return id().task() + "";
+ }
+
+ /**
+ * Get the configurations of current task.
+ *
+ * @return the configuration of current task.
+ */
+ @Override
+ public KeyValue configs() {
+ return taskConfig;
+ }
+
+ /**
+ * Reset the consumer offset for the given queue.
+ *
+ * @param recordPartition the partition to reset offset.
+ * @param recordOffset the offset to reset to.
+ */
+ @Override
+ public void resetOffset(RecordPartition recordPartition, RecordOffset recordOffset) {
+ // no-op
+ }
+
+ /**
+ * Reset the offsets for the given partition.
+ *
+ * @param offsets the map of offsets for targetPartition.
+ */
+ @Override
+ public void resetOffset(Map<RecordPartition, RecordOffset> offsets) {
+ // no-op
+ }
+
+ /**
+ * Pause consumption of messages from the specified partition.
+ *
+ * @param partitions the partition list to be reset offset.
+ */
+ @Override
+ public void pause(List<RecordPartition> partitions) {
+ // no-op
+ }
+
+ /**
+ * Resume consumption of messages from previously paused Partition.
+ *
+ * @param partitions the partition list to be resume.
+ */
+ @Override
+ public void resume(List<RecordPartition> partitions) {
+ // no-op
+ }
+
+ /**
+ * Current task assignment processing partition
+ *
+ * @return the partition list
+ */
+ @Override
+ public Set<RecordPartition> assignment() {
+ return null;
+ }
}
- @Override
- public void timeout() {
- this.state.set(WorkerTaskState.ERROR);
+
+ private class DirectSourceTaskContext implements SourceTaskContext {
+
+ /**
+ * Get the OffsetStorageReader for this SourceTask.
+ *
+ * @return offset storage reader
+ */
+ @Override
+ public OffsetStorageReader offsetStorageReader() {
+ return positionStorageReader;
+ }
+
+ /**
+ * Get the Connector Name
+ *
+ * @return connector name
+ */
+ @Override
+ public String getConnectorName() {
+ return id().connector();
+ }
+
+ /**
+ * Get the Task Id of connector.
+ *
+ * @return task name
+ */
+ @Override
+ public String getTaskName() {
+ return id().task() + "";
+ }
+
+ /**
+ * Get the configurations of current task.
+ *
+ * @return the configuration of current task.
+ */
+ @Override
+ public KeyValue configs() {
+ return taskConfig;
+ }
}
+
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
index 6081fdd..70fb7bc 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
@@ -28,20 +28,6 @@ import io.openmessaging.connector.api.data.SchemaAndValue;
import io.openmessaging.connector.api.errors.ConnectException;
import io.openmessaging.connector.api.errors.RetriableException;
import io.openmessaging.internal.DefaultKeyValue;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
@@ -56,6 +42,7 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.common.QueueState;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
import org.apache.rocketmq.connect.runtime.config.SinkConnectorConfig;
import org.apache.rocketmq.connect.runtime.errors.ErrorReporter;
@@ -64,53 +51,37 @@ import org.apache.rocketmq.connect.runtime.errors.WorkerErrorRecordReporter;
import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
+import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
+import org.apache.rocketmq.connect.runtime.utils.Utils;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
/**
* A wrapper of {@link SinkTask} for runtime.
*/
-public class WorkerSinkTask implements WorkerTask {
+public class WorkerSinkTask extends WorkerTask {
private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
-
- /**
- * The configuration key that provides the list of topicNames that are inputs for this SinkTask.
- */
- public static final String QUEUENAMES_CONFIG = "topicNames";
-
- /**
- * The configuration key that provide the list of topicQueues that are inputs for this SinkTask; The config value
- * format is topicName1,brokerName1,queueId1;topicName2,brokerName2,queueId2, use topicName1, brokerName1, queueId1
- * can construct {@link MessageQueue}
- */
- public static final String TOPIC_QUEUES_CONFIG = "topicQueues";
-
- /**
- * Connector name of current task.
- */
- private String connectorName;
-
/**
* The implements of the sink task.
*/
private SinkTask sinkTask;
- /**
- * The configs of current sink task.
- */
- private ConnectKeyValue taskConfig;
-
- /**
- * Atomic state variable
- */
- private AtomicReference<WorkerTaskState> state;
-
- /**
- * Stop retry limit
- */
-
/**
* A RocketMQ consumer to pull message from MQ.
*/
@@ -144,23 +115,17 @@ public class WorkerSinkTask implements WorkerTask {
private static final long PULL_MSG_ERROR_BACKOFF_MS = 1000 * 10;
private static final long PULL_NO_MSG_BACKOFF_MS = 1000 * 3;
-
private static final long PULL_MSG_ERROR_THRESHOLD = 16;
- private final AtomicReference<WorkerState> workerState;
-
+ /**
+ * stat
+ */
private final ConnectStatsManager connectStatsManager;
-
private final ConnectStatsService connectStatsService;
private final CountDownLatch stopPullMsgLatch;
-
private WorkerSinkTaskContext sinkTaskContext;
-
- private final TransformChain<ConnectRecord> transformChain;
-
private WorkerErrorRecordReporter errorRecordReporter;
- private RetryWithToleranceOperator retryWithToleranceOperator;
public static final String BROKER_NAME = "brokerName";
@@ -180,91 +145,31 @@ public class WorkerSinkTask implements WorkerTask {
}
};
- public WorkerSinkTask(String connectorName,
- SinkTask sinkTask,
- ConnectKeyValue taskConfig,
- RecordConverter recordConverter,
- DefaultMQPullConsumer consumer,
- AtomicReference<WorkerState> workerState,
- ConnectStatsManager connectStatsManager,
- ConnectStatsService connectStatsService,
- TransformChain<ConnectRecord> transformChain,
- RetryWithToleranceOperator retryWithToleranceOperator,
- WorkerErrorRecordReporter errorRecordReporter) {
- this.connectorName = connectorName;
+ public WorkerSinkTask(ConnectConfig workerConfig,
+ ConnectorTaskId id,
+ SinkTask sinkTask,
+ ClassLoader classLoader,
+ ConnectKeyValue taskConfig,
+ RecordConverter recordConverter,
+ DefaultMQPullConsumer consumer,
+ AtomicReference<WorkerState> workerState,
+ ConnectStatsManager connectStatsManager,
+ ConnectStatsService connectStatsService,
+ TransformChain<ConnectRecord> transformChain,
+ RetryWithToleranceOperator retryWithToleranceOperator,
+ WorkerErrorRecordReporter errorRecordReporter) {
+ super(workerConfig, id, classLoader, taskConfig, retryWithToleranceOperator, transformChain, workerState);
this.sinkTask = sinkTask;
- this.taskConfig = taskConfig;
this.consumer = consumer;
this.recordConverter = recordConverter;
this.messageQueuesOffsetMap = new ConcurrentHashMap<>(256);
this.messageQueuesStateMap = new ConcurrentHashMap<>(256);
- this.state = new AtomicReference<>(WorkerTaskState.NEW);
- this.workerState = workerState;
this.connectStatsManager = connectStatsManager;
this.connectStatsService = connectStatsService;
this.stopPullMsgLatch = new CountDownLatch(1);
- this.transformChain = transformChain;
+ this.sinkTaskContext = new WorkerSinkTaskContext(taskConfig, this, consumer);
this.errorRecordReporter = errorRecordReporter;
- this.retryWithToleranceOperator = retryWithToleranceOperator;
- this.transformChain.retryWithToleranceOperator(retryWithToleranceOperator);
- }
- /**
- * Start a sink task, and receive data entry from MQ cyclically.
- */
- @Override
- public void run() {
- try {
- registTopics();
- consumer.start();
- log.info("Sink task consumer start. taskConfig {}", JSON.toJSONString(taskConfig));
- state.compareAndSet(WorkerTaskState.NEW, WorkerTaskState.PENDING);
- this.sinkTaskContext = new WorkerSinkTaskContext(taskConfig, this, consumer);
- sinkTask.init(sinkTaskContext);
- sinkTask.start(taskConfig);
- // we assume executed here means we are safe
- log.info("Sink task start, config:{}", JSON.toJSONString(taskConfig));
- state.compareAndSet(WorkerTaskState.PENDING, WorkerTaskState.RUNNING);
-
- while (WorkerState.STARTED == workerState.get() && WorkerTaskState.RUNNING == state.get()) {
- // this method can block up to 3 minutes long
- try {
- preCommit(false);
- setQueueOffset();
- pullMessageFromQueues();
- } catch (RetriableException e) {
- connectStatsManager.incSinkRecordPutTotalFailNums();
- connectStatsManager.incSinkRecordPutFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
- log.error("Sink task RetriableException exception", e);
- } catch (InterruptedException e) {
- connectStatsManager.incSinkRecordPutTotalFailNums();
- connectStatsManager.incSinkRecordPutFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
- log.error("Sink task InterruptedException exception", e);
- throw e;
- } catch (Throwable e) {
- state.set(WorkerTaskState.ERROR);
- log.error(" sink task {},pull message MQClientException, Error {} ", this, e.getMessage(), e);
- connectStatsManager.incSinkRecordPutTotalFailNums();
- connectStatsManager.incSinkRecordPutFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
- } finally {
- // record sink read times
- connectStatsManager.incSinkRecordReadTotalTimes();
- }
- }
-
- sinkTask.stop();
- state.compareAndSet(WorkerTaskState.STOPPING, WorkerTaskState.STOPPED);
- log.info("Sink task stop, config:{}", JSON.toJSONString(taskConfig));
-
- } catch (Exception e) {
- log.error("Run task failed.", e);
- state.set(WorkerTaskState.ERROR);
- } finally {
- if (consumer != null) {
- consumer.shutdown();
- log.info("Sink task consumer shutdown. config:{}", JSON.toJSONString(taskConfig));
- }
- }
}
private void setQueueOffset() {
@@ -285,10 +190,13 @@ public class WorkerSinkTask implements WorkerTask {
this.sinkTaskContext.cleanQueuesOffsets();
}
- private void registTopics() {
+ /**
+ * sub topics
+ */
+ private void registryTopics() {
Set<String> topics = SinkConnectorConfig.parseTopicList(taskConfig);
if (org.apache.commons.collections4.CollectionUtils.isEmpty(topics)) {
- throw new ConnectException("sink connector topics config can be null, please check sink connector config info");
+ throw new ConnectException("Sink connector topics config can be null, please check sink connector config info");
}
for (String topic : topics) {
consumer.registerMessageQueueListener(topic, new MessageQueueListener() {
@@ -301,7 +209,6 @@ public class WorkerSinkTask implements WorkerTask {
messageQueuesOffsetMap.remove(key, value);
}
});
-
Set<RecordPartition> waitRemoveQueueMetaDatas = new HashSet<>();
recordPartitions.forEach(key -> {
if (key.getPartition().get("topic").equals(topic)) {
@@ -359,8 +266,11 @@ public class WorkerSinkTask implements WorkerTask {
}
public void incPullTPS(String topic, int pullSize) {
- consumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory()
- .getConsumerStatsManager().incPullTPS(consumer.getConsumerGroup(), topic, pullSize);
+ consumer.getDefaultMQPullConsumerImpl()
+ .getRebalanceImpl()
+ .getmQClientFactory()
+ .getConsumerStatsManager()
+ .incPullTPS(consumer.getConsumerGroup(), topic, pullSize);
}
private void pullMessageFromQueues() throws InterruptedException {
@@ -372,13 +282,12 @@ public class WorkerSinkTask implements WorkerTask {
}
for (Map.Entry<MessageQueue, Long> entry : messageQueuesOffsetMap.entrySet()) {
if (messageQueuesStateMap.containsKey(entry.getKey())) {
- log.warn("sink task message queue state is not running, sink task id {}, queue info {}, queue state {}", taskConfig.getString(RuntimeConfigDefine.TASK_ID), JSON.toJSONString(entry.getKey()), JSON.toJSONString(messageQueuesStateMap.get(entry.getKey())));
+ log.warn("sink task message queue state is not running, sink task id {}, queue info {}, queue state {}", id().toString(), JSON.toJSONString(entry.getKey()), JSON.toJSONString(messageQueuesStateMap.get(entry.getKey())));
continue;
}
log.info("START pullBlockIfNotFound, time started : {}", System.currentTimeMillis());
-
- if (WorkerTaskState.RUNNING != state.get()) {
- log.warn("sink task state is not running, sink task id {}, state {}", taskConfig.getString(RuntimeConfigDefine.TASK_ID), state.get().name());
+ if (isStopping()) {
+ log.warn("sink task state is not running, sink task id {}, state {}", id().toString(), state.get().name());
break;
}
PullResult pullResult = null;
@@ -386,95 +295,75 @@ public class WorkerSinkTask implements WorkerTask {
try {
shouldStopPullMsg();
pullResult = consumer.pull(entry.getKey(), "*", entry.getValue(), MAX_MESSAGE_NUM);
+ if (pullResult == null) {
+ continue;
+ }
pullMsgErrorCount = 0;
- } catch (MQClientException e) {
+ } catch (MQClientException | RemotingException | MQBrokerException e) {
pullMsgErrorCount++;
- log.error(" sink task message queue {}, offset {}, taskconfig {},pull message MQClientException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
- connectStatsManager.incSinkRecordReadTotalFailNums();
- connectStatsManager.incSinkRecordReadFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
- long errorPullRT = System.currentTimeMillis() - beginPullMsgTimestamp;
- connectStatsManager.incSinkRecordReadTotalFailRT(errorPullRT);
- connectStatsManager.incSinkRecordReadFailRT(taskConfig.getString(RuntimeConfigDefine.TASK_ID), errorPullRT);
- } catch (RemotingException e) {
- pullMsgErrorCount++;
- log.error(" sink task message queue {}, offset {}, taskconfig {},pull message RemotingException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
- connectStatsManager.incSinkRecordReadTotalFailNums();
- connectStatsManager.incSinkRecordReadFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
- long errorPullRT = System.currentTimeMillis() - beginPullMsgTimestamp;
- connectStatsManager.incSinkRecordReadTotalFailRT(errorPullRT);
- connectStatsManager.incSinkRecordReadFailRT(taskConfig.getString(RuntimeConfigDefine.TASK_ID), errorPullRT);
- } catch (MQBrokerException e) {
- pullMsgErrorCount++;
- log.error(" sink task message queue {}, offset {}, taskconfig {},pull message MQBrokerException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
- connectStatsManager.incSinkRecordReadTotalFailNums();
- connectStatsManager.incSinkRecordReadFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
- long errorPullRT = System.currentTimeMillis() - beginPullMsgTimestamp;
- connectStatsManager.incSinkRecordReadTotalFailRT(errorPullRT);
- connectStatsManager.incSinkRecordReadFailRT(taskConfig.getString(RuntimeConfigDefine.TASK_ID), errorPullRT);
+ log.error(" sink task message queue {}, offset {}, taskconfig {},pull message {}, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getClass().getName(), e.getMessage(), this.state.get(), e);
+ readRecordFail(beginPullMsgTimestamp);
} catch (InterruptedException e) {
pullMsgErrorCount++;
log.error(" sink task message queue {}, offset {}, taskconfig {},pull message InterruptedException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
- connectStatsManager.incSinkRecordReadTotalFailNums();
- connectStatsManager.incSinkRecordReadFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
- long errorPullRT = System.currentTimeMillis() - beginPullMsgTimestamp;
- connectStatsManager.incSinkRecordReadTotalFailRT(errorPullRT);
- connectStatsManager.incSinkRecordReadFailRT(taskConfig.getString(RuntimeConfigDefine.TASK_ID), errorPullRT);
+ readRecordFail(beginPullMsgTimestamp);
throw e;
} catch (Throwable e) {
pullMsgErrorCount++;
log.error(" sink task message queue {}, offset {}, taskconfig {},pull message Throwable, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), e);
- connectStatsManager.incSinkRecordReadTotalFailNums();
- connectStatsManager.incSinkRecordReadFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
- long errorPullRT = System.currentTimeMillis() - beginPullMsgTimestamp;
- connectStatsManager.incSinkRecordReadTotalFailRT(errorPullRT);
- connectStatsManager.incSinkRecordReadFailRT(taskConfig.getString(RuntimeConfigDefine.TASK_ID), errorPullRT);
+ readRecordFail(beginPullMsgTimestamp);
throw e;
}
- long currentTime = System.currentTimeMillis();
List<MessageExt> messages = null;
- log.info("INSIDE pullMessageFromQueues, time elapsed : {}", currentTime - startTimeStamp);
- if (null != pullResult && pullResult.getPullStatus().equals(PullStatus.FOUND)) {
- pullNotFountMsgCount = 0;
- this.incPullTPS(entry.getKey().getTopic(), pullResult.getMsgFoundList().size());
- messages = pullResult.getMsgFoundList();
- connectStatsManager.incSinkRecordReadTotalNums(messages.size());
- connectStatsManager.incSinkRecordReadNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID), messages.size());
- long pullRT = System.currentTimeMillis() - beginPullMsgTimestamp;
- connectStatsManager.incSinkRecordReadTotalRT(pullRT);
- connectStatsManager.incSinkRecordReadRT(taskConfig.getString(RuntimeConfigDefine.TASK_ID), pullRT);
- receiveMessages(messages);
- if (messageQueuesOffsetMap.containsKey(entry.getKey())) {
- messageQueuesOffsetMap.put(entry.getKey(), pullResult.getNextBeginOffset());
- } else {
- log.warn("The consumer may have load balancing, and the current task does not process the message queue,messageQueuesOffsetMap {}, messageQueue {}", JSON.toJSONString(messageQueuesOffsetMap), JSON.toJSONString(entry.getKey()));
- }
- try {
- consumer.updateConsumeOffset(entry.getKey(), pullResult.getNextBeginOffset());
- } catch (MQClientException e) {
- log.warn("updateConsumeOffset MQClientException, pullResult {}", pullResult, e);
- }
- } else if (null != pullResult && pullResult.getPullStatus().equals(PullStatus.OFFSET_ILLEGAL)) {
- log.warn("offset illegal, reset offset, message queue {}, pull offset {}, nextBeginOffset {}", JSON.toJSONString(entry.getKey()), entry.getValue(), pullResult.getNextBeginOffset());
- this.sinkTaskContext.resetOffset(ConnectUtil.convertToRecordPartition(entry.getKey()), ConnectUtil.convertToRecordOffset(pullResult.getNextBeginOffset()));
- } else if (null != pullResult && pullResult.getPullStatus().equals(PullStatus.NO_NEW_MSG)) {
- pullNotFountMsgCount++;
- log.info("no new message, pullResult {}, message queue {}, pull offset {}", JSON.toJSONString(pullResult), JSON.toJSONString(entry.getKey()), entry.getValue());
- } else if (null != pullResult && pullResult.getPullStatus().equals(PullStatus.NO_MATCHED_MSG)) {
- log.info("no matched msg, pullResult {}, message queue {}, pull offset {}", JSON.toJSONString(pullResult), JSON.toJSONString(entry.getKey()), entry.getValue());
- this.sinkTaskContext.resetOffset(ConnectUtil.convertToRecordPartition(entry.getKey()), ConnectUtil.convertToRecordOffset(pullResult.getNextBeginOffset()));
- } else {
- pullNotFountMsgCount++;
- log.info("unknow pull msg state, pullResult {}, message queue {}, pull offset {}", JSON.toJSONString(pullResult), JSON.toJSONString(entry.getKey()), entry.getValue());
+ log.info("INSIDE pullMessageFromQueues, time elapsed : {}", System.currentTimeMillis() - startTimeStamp);
+ PullStatus status = pullResult.getPullStatus();
+ switch (status) {
+ case FOUND:
+ pullNotFountMsgCount = 0;
+ this.incPullTPS(entry.getKey().getTopic(), pullResult.getMsgFoundList().size());
+ messages = pullResult.getMsgFoundList();
+ recordReadSuccess(messages.size(), beginPullMsgTimestamp);
+ receiveMessages(messages);
+ if (messageQueuesOffsetMap.containsKey(entry.getKey())) {
+ // put offset
+ messageQueuesOffsetMap.put(entry.getKey(), pullResult.getNextBeginOffset());
+ } else {
+ // load balancing
+ log.warn("The consumer may have load balancing, and the current task does not process the message queue,messageQueuesOffsetMap {}, messageQueue {}", JSON.toJSONString(messageQueuesOffsetMap), JSON.toJSONString(entry.getKey()));
+ }
+ try {
+ consumer.updateConsumeOffset(entry.getKey(), pullResult.getNextBeginOffset());
+ } catch (MQClientException e) {
+ log.warn("updateConsumeOffset MQClientException, pullResult {}", pullResult, e);
+ }
+ break;
+ case OFFSET_ILLEGAL:
+ log.warn("Offset illegal, reset offset, message queue {}, pull offset {}, nextBeginOffset {}", JSON.toJSONString(entry.getKey()), entry.getValue(), pullResult.getNextBeginOffset());
+ this.sinkTaskContext.resetOffset(ConnectUtil.convertToRecordPartition(entry.getKey()), ConnectUtil.convertToRecordOffset(pullResult.getNextBeginOffset()));
+ break;
+ case NO_NEW_MSG:
+ pullNotFountMsgCount++;
+ log.info("No new message, pullResult {}, message queue {}, pull offset {}", JSON.toJSONString(pullResult), JSON.toJSONString(entry.getKey()), entry.getValue());
+ break;
+ case NO_MATCHED_MSG:
+ log.info("no matched msg, pullResult {}, message queue {}, pull offset {}", JSON.toJSONString(pullResult), JSON.toJSONString(entry.getKey()), entry.getValue());
+ this.sinkTaskContext.resetOffset(ConnectUtil.convertToRecordPartition(entry.getKey()), ConnectUtil.convertToRecordOffset(pullResult.getNextBeginOffset()));
+ break;
+ default:
+ pullNotFountMsgCount++;
+ log.info("unknow pull msg state, pullResult {}, message queue {}, pull offset {}", JSON.toJSONString(pullResult), JSON.toJSONString(entry.getKey()), entry.getValue());
+ break;
}
- AtomicLong atomicLong = connectStatsService.singleSinkTaskTimesTotal(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
+ AtomicLong atomicLong = connectStatsService.singleSinkTaskTimesTotal(id().toString());
if (null != atomicLong) {
atomicLong.addAndGet(org.apache.commons.collections4.CollectionUtils.isEmpty(messages) ? 0 : messages.size());
}
}
}
+
private void shouldStopPullMsg() throws InterruptedException {
if (pullMsgErrorCount == PULL_MSG_ERROR_THRESHOLD) {
log.error("Accumulative error {} times, stop pull msg for {} ms", pullMsgErrorCount, PULL_MSG_ERROR_BACKOFF_MS);
@@ -508,37 +397,15 @@ public class WorkerSinkTask implements WorkerTask {
}
}
- private void removePauseQueueMessage(MessageQueue messageQueue, List<MessageExt> messages) {
- if (null != messageQueuesStateMap.get(messageQueue)) {
- final Iterator<MessageExt> iterator = messages.iterator();
- while (iterator.hasNext()) {
- final MessageExt message = iterator.next();
- String msgId = message.getMsgId();
- log.info("BrokerName {}, topicName {}, queueId {} is pause, Discard the message {}", messageQueue.getBrokerName(), messageQueue.getTopic(), message.getQueueId(), msgId);
- iterator.remove();
- }
- }
- }
-
@Override
- public void stop() {
- state.compareAndSet(WorkerTaskState.RUNNING, WorkerTaskState.STOPPING);
- try {
- transformChain.close();
- } catch (Exception exception) {
- log.error("Transform close failed, {}", exception);
- }
+ public void close() {
+ sinkTask.stop();
+ consumer.shutdown();
+ stopPullMsgLatch.countDown();
+ Utils.closeQuietly(transformChain, "transform chain");
+ Utils.closeQuietly(retryWithToleranceOperator, "retry operator");
}
- @Override
- public void cleanup() {
- if (state.compareAndSet(WorkerTaskState.STOPPED, WorkerTaskState.TERMINATED) ||
- state.compareAndSet(WorkerTaskState.ERROR, WorkerTaskState.TERMINATED))
- consumer.shutdown();
- else {
- log.error("[BUG] cleaning a task but it's not in STOPPED or ERROR state");
- }
- }
/**
* receive message from MQ.
@@ -546,28 +413,17 @@ public class WorkerSinkTask implements WorkerTask {
* @param messages
*/
private void receiveMessages(List<MessageExt> messages) {
- List<ConnectRecord> sinkDataEntries = new ArrayList<>(32);
+ List<ConnectRecord> records = new ArrayList<>(32);
for (MessageExt message : messages) {
this.retryWithToleranceOperator.consumerRecord(message);
- ConnectRecord sinkDataEntry = this.retryWithToleranceOperator.execute(() -> convertToSinkDataEntry(message), ErrorReporter.Stage.CONVERTER, WorkerSinkTask.class);
- if (sinkDataEntry != null && !this.retryWithToleranceOperator.failed())
- sinkDataEntries.add(sinkDataEntry);
- String msgId = message.getMsgId();
- log.info("Received one message success : msgId {}", msgId);
- }
- List<ConnectRecord> connectRecordList = new ArrayList<>(32);
- for (ConnectRecord connectRecord : sinkDataEntries) {
- ConnectRecord connectRecord1 = this.transformChain.doTransforms(connectRecord);
- if (null != connectRecord1) {
- connectRecordList.add(connectRecord1);
+ ConnectRecord connectRecord = convertMessages(message);
+ if (connectRecord != null && !this.retryWithToleranceOperator.failed()) {
+ records.add(connectRecord);
}
- }
- if (CollectionUtils.isEmpty(connectRecordList)) {
- log.info("after transforms connectRecordList is null");
- return;
+ log.info("Received one message success : msgId {}", message.getMsgId());
}
try {
- sinkTask.put(connectRecordList);
+ sinkTask.put(records);
return;
} catch (RetriableException e) {
log.error("task {} put sink recode RetriableException", this, e.getMessage(), e);
@@ -579,15 +435,14 @@ public class WorkerSinkTask implements WorkerTask {
}
- private ConnectRecord convertToSinkDataEntry(MessageExt message) {
+ private ConnectRecord convertMessages(MessageExt message) {
Map<String, String> properties = message.getProperties();
- ConnectRecord sinkDataEntry;
-
+ ConnectRecord record;
// start convert
if (recordConverter == null) {
final byte[] messageBody = message.getBody();
String s = new String(messageBody);
- sinkDataEntry = JSON.parseObject(s, ConnectRecord.class);
+ record = JSON.parseObject(s, ConnectRecord.class);
} else {
// timestamp
String connectTimestamp = properties.get(RuntimeConfigDefine.CONNECT_TIMESTAMP);
@@ -600,11 +455,21 @@ public class WorkerSinkTask implements WorkerTask {
// convert
SchemaAndValue schemaAndValue = retryWithToleranceOperator.execute(() -> recordConverter.toConnectData(message.getTopic(), message.getBody()),
ErrorReporter.Stage.CONVERTER, recordConverter.getClass());
- sinkDataEntry = new ConnectRecord(recordPartition, recordOffset, timestamp, schemaAndValue.schema(), schemaAndValue.value());
+ record = new ConnectRecord(recordPartition, recordOffset, timestamp, schemaAndValue.schema(), schemaAndValue.value());
+ if (retryWithToleranceOperator.failed()) {
+ return null;
+ }
}
+
+ // Apply the transformations
+ ConnectRecord transformedRecord = transformChain.doTransforms(record);
+ if (transformedRecord == null) {
+ return null;
+ }
+
// add extension
- addExtension(properties, sinkDataEntry);
- return sinkDataEntry;
+ addExtension(properties, record);
+ return record;
}
private void addExtension(Map<String, String> properties, ConnectRecord sinkDataEntry) {
@@ -623,46 +488,48 @@ public class WorkerSinkTask implements WorkerTask {
sinkDataEntry.addExtension(keyValue);
}
- @Override
- public String getConnectorName() {
- return connectorName;
- }
-
- @Override
- public WorkerTaskState getState() {
- return state.get();
- }
-
- @Override
- public ConnectKeyValue getTaskConfig() {
- return taskConfig;
- }
-
/**
- * Further we cant try to log what caused the error
+ * initinalize and start
*/
@Override
- public void timeout() {
- this.state.set(WorkerTaskState.ERROR);
+ protected void initializeAndStart() {
+ registryTopics();
+ try {
+ consumer.start();
+ } catch (MQClientException e) {
+ }
+ log.info("Sink task consumer start. taskConfig {}", JSON.toJSONString(taskConfig));
+ sinkTask.init(sinkTaskContext);
+ sinkTask.start(taskConfig);
}
+ /**
+ * execute poll and send record
+ */
@Override
- public String toString() {
-
- StringBuilder sb = new StringBuilder();
- sb.append("connectorName:" + connectorName)
- .append("\nConfigs:" + JSON.toJSONString(taskConfig))
- .append("\nState:" + state.get().toString());
- return sb.toString();
- }
+ protected void execute() {
+ while (isRunning()) {
+ // this method can block up to 3 minutes long
+ try {
+ preCommit(false);
+ setQueueOffset();
+ pullMessageFromQueues();
+ } catch (RetriableException e) {
+ readRecordFailNum();
+ log.error("Sink task RetriableException exception", e);
+ } catch (InterruptedException e) {
+ readRecordFailNum();
+ log.error("Sink task InterruptedException exception", e);
+ } catch (Throwable e) {
+ log.error(" sink task {},pull message MQClientException, Error {} ", this, e.getMessage(), e);
+ readRecordFailNum();
+ throw e;
+ } finally {
+ // record sink read times
+ connectStatsManager.incSinkRecordReadTotalTimes();
+ }
+ }
- @Override
- public Object getJsonObject() {
- HashMap obj = new HashMap<String, Object>();
- obj.put("connectorName", connectorName);
- obj.put("configs", JSON.toJSONString(taskConfig));
- obj.put("state", state.get().toString());
- return obj;
}
public Set<RecordPartition> getRecordPartitions() {
@@ -688,13 +555,60 @@ public class WorkerSinkTask implements WorkerTask {
this.sinkTaskContext.resetOffset(offsets);
}
+
+ private void removePauseQueueMessage(MessageQueue messageQueue, List<MessageExt> messages) {
+ if (null != messageQueuesStateMap.get(messageQueue)) {
+ final Iterator<MessageExt> iterator = messages.iterator();
+ while (iterator.hasNext()) {
+ final MessageExt message = iterator.next();
+ String msgId = message.getMsgId();
+ log.info("BrokerName {}, topicName {}, queueId {} is pause, Discard the message {}", messageQueue.getBrokerName(), messageQueue.getTopic(), message.getQueueId(), msgId);
+ iterator.remove();
+ }
+ }
+ }
+
/**
* error record reporter
+ *
* @return
*/
public WorkerErrorRecordReporter errorRecordReporter() {
return errorRecordReporter;
}
+
+ private void recordReadSuccess(int recordSize, long beginPullMsgTimestamp) {
+ long pullRT = System.currentTimeMillis() - beginPullMsgTimestamp;
+ recordReadNums(recordSize);
+ recordReadRT(pullRT);
+ }
+
+ private void recordReadNums(int size) {
+ connectStatsManager.incSinkRecordReadTotalNums(size);
+ connectStatsManager.incSinkRecordReadNums(id().toString(), size);
+ }
+
+ private void recordReadRT(long pullRT) {
+ connectStatsManager.incSinkRecordReadTotalRT(pullRT);
+ connectStatsManager.incSinkRecordReadRT(id().toString(), pullRT);
+ }
+
+
+ private void readRecordFail(long beginPullMsgTimestamp) {
+ readRecordFailNum();
+ readRecordFailRT(System.currentTimeMillis() - beginPullMsgTimestamp);
+ }
+
+ private void readRecordFailRT(long errorPullRT) {
+ connectStatsManager.incSinkRecordReadTotalFailRT(errorPullRT);
+ connectStatsManager.incSinkRecordReadFailRT(id().toString(), errorPullRT);
+ }
+
+ private void readRecordFailNum() {
+ connectStatsManager.incSinkRecordReadTotalFailNums();
+ connectStatsManager.incSinkRecordReadFailNums(id().toString());
+ }
+
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContext.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContext.java
index e698d1d..6a774ad 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContext.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContext.java
@@ -150,7 +150,7 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
}
messageQueuesStateMap.put(messageQueue, QueueState.PAUSE);
}
- this.workerSinkTask.stop();
+ this.workerSinkTask.close();
}
@Override
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index 6711dc5..746c871 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -22,10 +22,8 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.task.source.SourceTask;
-import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.data.RecordConverter;
-import io.openmessaging.connector.api.data.RecordOffset;
import io.openmessaging.connector.api.data.RecordPartition;
import io.openmessaging.connector.api.data.RecordPosition;
import io.openmessaging.connector.api.errors.ConnectException;
@@ -36,30 +34,40 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
import org.apache.rocketmq.connect.runtime.errors.ErrorReporter;
import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.rocketmq.connect.runtime.errors.ToleranceType;
import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
import org.apache.rocketmq.connect.runtime.store.PositionStorageReaderImpl;
import org.apache.rocketmq.connect.runtime.store.PositionStorageWriter;
+import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
+import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
import org.apache.rocketmq.connect.runtime.utils.Utils;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -68,39 +76,26 @@ import static org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSinkTas
/**
* A wrapper of {@link SourceTask} for runtime.
*/
-public class WorkerSourceTask implements WorkerTask {
+public class WorkerSourceTask extends WorkerTask {
private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
-
- /**
- * Connector name of current task.
- */
- private String connectorName;
-
+ private static final long SEND_FAILED_BACKOFF_MS = 100;
/**
* The implements of the source task.
*/
- private SourceTask sourceTask;
-
- /**
- * The configs of current source task.
- */
- private ConnectKeyValue taskConfig;
+ private final SourceTask sourceTask;
- /**
- * Atomic state variable
- */
- private AtomicReference<WorkerTaskState> state;
+ protected final WorkerSourceTaskContext sourceTaskContext;
/**
* Used to read the position of source data source.
*/
- private OffsetStorageReader offsetStorageReader;
+ private final OffsetStorageReader offsetStorageReader;
/**
* Used to write the position of source data source.
*/
- private PositionStorageWriter positionStorageWriter;
+ private final PositionStorageWriter positionStorageWriter;
/**
* A RocketMQ producer to send message to dest MQ.
@@ -110,19 +105,21 @@ public class WorkerSourceTask implements WorkerTask {
/**
* A converter to parse source data entry to byte[].
*/
- private RecordConverter recordConverter;
-
- private final AtomicReference<WorkerState> workerState;
+ private final RecordConverter recordConverter;
- private ConnectStatsManager connectStatsManager;
-
- private ConnectStatsService connectStatsService;
+ /**
+ * stat connect
+ */
+ private final ConnectStatsManager connectStatsManager;
+ private final ConnectStatsService connectStatsService;
+ private final CountDownLatch stopRequestedLatch;
+ private final AtomicReference<Throwable> producerSendException;
private List<ConnectRecord> toSendRecord;
- private TransformChain<ConnectRecord> transformChain;
- private RetryWithToleranceOperator retryWithToleranceOperator;
+ private volatile RecordOffsetManagement.CommittableOffsets committableOffsets;
+ private final RecordOffsetManagement offsetManagement;
/**
* The property of message in WHITE_KEY_SET don't need add a connect prefix
*/
@@ -133,8 +130,10 @@ public class WorkerSourceTask implements WorkerTask {
WHITE_KEY_SET.add(MessageConst.PROPERTY_TAGS);
}
- public WorkerSourceTask(String connectorName,
+ public WorkerSourceTask(ConnectConfig workerConfig,
+ ConnectorTaskId id,
SourceTask sourceTask,
+ ClassLoader classLoader,
ConnectKeyValue taskConfig,
PositionManagementService positionManagementService,
RecordConverter recordConverter,
@@ -144,248 +143,277 @@ public class WorkerSourceTask implements WorkerTask {
ConnectStatsService connectStatsService,
TransformChain<ConnectRecord> transformChain,
RetryWithToleranceOperator retryWithToleranceOperator) {
- this.connectorName = connectorName;
+ super(workerConfig, id, classLoader, taskConfig, retryWithToleranceOperator, transformChain, workerState);
+
this.sourceTask = sourceTask;
- this.taskConfig = taskConfig;
- this.offsetStorageReader = new PositionStorageReaderImpl(connectorName, positionManagementService);
- this.positionStorageWriter = new PositionStorageWriter(connectorName, positionManagementService);
+ this.offsetStorageReader = new PositionStorageReaderImpl(id.connector(), positionManagementService);
+ this.positionStorageWriter = new PositionStorageWriter(id.connector(), positionManagementService);
this.producer = producer;
this.recordConverter = recordConverter;
- this.state = new AtomicReference<>(WorkerTaskState.NEW);
- this.workerState = workerState;
this.connectStatsManager = connectStatsManager;
this.connectStatsService = connectStatsService;
- this.transformChain = transformChain;
- this.retryWithToleranceOperator = retryWithToleranceOperator;
- this.transformChain.retryWithToleranceOperator(this.retryWithToleranceOperator);
- }
-
- /**
- * Start a source task, and send data entry to MQ cyclically.
- */
- @Override
- public void run() {
- try {
- producer.start();
- log.info("Source task producer start.");
- state.compareAndSet(WorkerTaskState.NEW, WorkerTaskState.PENDING);
- sourceTask.init(new SourceTaskContext() {
-
- @Override
- public OffsetStorageReader offsetStorageReader() {
- return offsetStorageReader;
- }
-
- @Override
- public String getConnectorName() {
- return taskConfig.getString(RuntimeConfigDefine.CONNECTOR_ID);
- }
-
- @Override
- public String getTaskName() {
- return taskConfig.getString(RuntimeConfigDefine.TASK_ID);
- }
-
- /**
- * Get the configurations of current task.
- *
- * @return the configuration of current task.
- */
- @Override
- public KeyValue configs() {
- return taskConfig;
- }
- });
- sourceTask.start(taskConfig);
- state.compareAndSet(WorkerTaskState.PENDING, WorkerTaskState.RUNNING);
- log.info("Source task start, config:{}", JSON.toJSONString(taskConfig));
- while (WorkerState.STARTED == workerState.get() && WorkerTaskState.RUNNING == state.get()) {
- if (CollectionUtils.isEmpty(toSendRecord)) {
- try {
- toSendRecord = poll();
- if (null != toSendRecord && toSendRecord.size() > 0) {
- connectStatsManager.incSourceRecordPollTotalNums(toSendRecord.size());
- connectStatsManager.incSourceRecordPollNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID), toSendRecord.size());
- sendRecord();
- }
- } catch (RetriableException e) {
- connectStatsManager.incSourceRecordPollTotalFailNums();
- connectStatsManager.incSourceRecordPollFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
- log.error("Source task RetriableException exception", e);
- } catch (Exception e) {
- connectStatsManager.incSourceRecordPollTotalFailNums();
- connectStatsManager.incSourceRecordPollFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
- log.error("Source task Exception exception", e);
- state.set(WorkerTaskState.ERROR);
- } finally {
- // record source poll times
- connectStatsManager.incSourceRecordPollTotalTimes();
- }
- }
- AtomicLong atomicLong = connectStatsService.singleSourceTaskTimesTotal(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
- if (null != atomicLong) {
- atomicLong.addAndGet(toSendRecord == null ? 0 : toSendRecord.size());
- }
- }
- sourceTask.stop();
- state.compareAndSet(WorkerTaskState.STOPPING, WorkerTaskState.STOPPED);
- log.info("Source task stop, config:{}", JSON.toJSONString(taskConfig));
- } catch (Exception e) {
- log.error("Run task failed., task config: " + JSON.toJSONString(taskConfig), e);
- state.set(WorkerTaskState.ERROR);
- } finally {
- if (producer != null) {
- producer.shutdown();
- log.info("Source task producer shutdown. task config {}", JSON.toJSONString(taskConfig));
- }
- }
+ this.sourceTaskContext = new WorkerSourceTaskContext(offsetStorageReader, this, taskConfig);
+ this.stopRequestedLatch = new CountDownLatch(1);
+ this.producerSendException = new AtomicReference<>();
+ this.offsetManagement = new RecordOffsetManagement();
+ this.committableOffsets = RecordOffsetManagement.CommittableOffsets.EMPTY;
}
private List<ConnectRecord> poll() throws InterruptedException {
- List<ConnectRecord> connectRecordList = null;
try {
- connectRecordList = sourceTask.poll();
- if (CollectionUtils.isEmpty(connectRecordList)) {
+ List<ConnectRecord> connectRecords = sourceTask.poll();
+ if (CollectionUtils.isEmpty(connectRecords)) {
return null;
}
- List<ConnectRecord> connectRecordList1 = new ArrayList<>(32);
- for (ConnectRecord connectRecord : connectRecordList) {
-
- retryWithToleranceOperator.sourceRecord(connectRecord);
-
- ConnectRecord connectRecord1 = this.transformChain.doTransforms(connectRecord);
- if (null != connectRecord1 && !retryWithToleranceOperator.failed()) {
- connectRecordList1.add(connectRecord1);
- }
- }
- return connectRecordList1;
+ return connectRecords;
} catch (RetriableException e) {
log.error("Source task RetriableException exception, taskconfig {}", JSON.toJSONString(taskConfig), e);
return null;
}
}
+
@Override
- public void stop() {
- state.compareAndSet(WorkerTaskState.RUNNING, WorkerTaskState.STOPPING);
- Utils.closeQuietly(retryWithToleranceOperator, "retry operator");
+ public void close() {
+ producer.shutdown();
+ stopRequestedLatch.countDown();
Utils.closeQuietly(transformChain, "transform chain");
- log.warn("Stop a task success.");
+ Utils.closeQuietly(retryWithToleranceOperator, "retry operator");
+ Utils.closeQuietly(positionStorageWriter, "position storage writer");
}
- @Override
- public void cleanup() {
- log.info("Cleaning a task, current state {}, destination state {}", state.get().name(), WorkerTaskState.TERMINATED.name());
- if (state.compareAndSet(WorkerTaskState.STOPPED, WorkerTaskState.TERMINATED) ||
- state.compareAndSet(WorkerTaskState.ERROR, WorkerTaskState.TERMINATED)) {
- log.info("Cleaning a task success");
- } else {
- log.error("[BUG] cleaning a task but it's not in STOPPED or ERROR state");
+ protected void updateCommittableOffsets() {
+ RecordOffsetManagement.CommittableOffsets newOffsets = offsetManagement.committableOffsets();
+ synchronized (this) {
+ this.committableOffsets = this.committableOffsets.updatedWith(newOffsets);
}
}
+
+ protected Optional<RecordOffsetManagement.SubmittedPosition> prepareToSendRecord(
+ ConnectRecord record
+ ) {
+ maybeThrowProducerSendException();
+ return Optional.of(this.offsetManagement.submitRecord(record.getPosition()));
+ }
+
+
/**
* Send list of sourceDataEntries to MQ.
*/
- private void sendRecord() throws InterruptedException, RemotingException, MQClientException {
- for (ConnectRecord sourceDataEntry : toSendRecord) {
- RecordPosition position = sourceDataEntry.getPosition();
- RecordOffset offset = position.getOffset();
-
- Message sourceMessage = new Message();
- String topic = null;
- topic = taskConfig.getString(RuntimeConfigDefine.CONNECT_TOPICNAME);
- if (StringUtils.isBlank(topic)) {
- RecordPosition recordPosition = sourceDataEntry.getPosition();
- if (null == recordPosition) {
- log.error("connect-topicname config is null and recordPosition is null , lack of topic config");
- return;
- }
- RecordPartition partition = recordPosition.getPartition();
- if (null == partition) {
- log.error("connect-topicname config is null and partition is null , lack of topic config");
- return;
- }
- Map<String, ?> partitionMap = partition.getPartition();
- if (null == partitionMap) {
- log.error("connect-topicname config is null and partitionMap is null , lack of topic config");
- return;
- }
- Object o = partitionMap.get(TOPIC);
- if (null == o) {
- log.error("connect-topicname config is null and partitionMap.get is null , lack of topic config");
- return;
- }
- topic = (String) o;
- }
- if (StringUtils.isBlank(topic)) {
- throw new ConnectException("source connect lack of topic config");
- }
- sourceMessage.setTopic(topic);
- // converter
- if (recordConverter == null) {
- final byte[] messageBody = JSON.toJSONString(sourceDataEntry, SerializerFeature.DisableCircularReferenceDetect, SerializerFeature.WriteMapNullValue).getBytes();
- if (messageBody.length > RuntimeConfigDefine.MAX_MESSAGE_SIZE) {
- log.error("Send record, message size is greater than {} bytes, sourceDataEntry: {}", RuntimeConfigDefine.MAX_MESSAGE_SIZE, JSON.toJSONString(sourceDataEntry));
- continue;
- }
- sourceMessage.setBody(messageBody);
- } else {
- String finalTopic = topic;
- byte[] messageBody = retryWithToleranceOperator.execute(() -> recordConverter.fromConnectData(finalTopic, sourceDataEntry.getSchema(), sourceDataEntry.getData()),
- ErrorReporter.Stage.CONVERTER, recordConverter.getClass());
- if (messageBody.length > RuntimeConfigDefine.MAX_MESSAGE_SIZE) {
- log.error("Send record, message size is greater than {} bytes, sourceDataEntry: {}", RuntimeConfigDefine.MAX_MESSAGE_SIZE, JSON.toJSONString(sourceDataEntry));
- continue;
- }
- sourceMessage.setBody(messageBody);
+ private Boolean sendRecord() throws InterruptedException {
+ int processed = 0;
+ for (ConnectRecord preTransformRecord : toSendRecord) {
+ retryWithToleranceOperator.sourceRecord(preTransformRecord);
+ ConnectRecord record = transformChain.doTransforms(preTransformRecord);
+ String topic = maybeCreateAndGetTopic(record);
+ Message sourceMessage = convertTransformedRecord(topic, record);
+ if (sourceMessage == null || retryWithToleranceOperator.failed()) {
+ // commit record
+ recordFailed(preTransformRecord);
+ continue;
}
- // put extend msg property
- putExtendMsgProperty(sourceDataEntry, sourceMessage, topic);
-
+ log.trace("{} Appending record to the topic {} , value {}", this, topic, record.getData());
+ /**prepare to send record*/
+ Optional<RecordOffsetManagement.SubmittedPosition> submittedRecordPosition = prepareToSendRecord(preTransformRecord);
try {
producer.send(sourceMessage, new SendCallback() {
@Override
- public void onSuccess(org.apache.rocketmq.client.producer.SendResult result) {
+ public void onSuccess(SendResult result) {
log.info("Successful send message to RocketMQ:{}, Topic {}", result.getMsgId(), result.getMessageQueue().getTopic());
- connectStatsManager.incSourceRecordWriteTotalNums();
- connectStatsManager.incSourceRecordWriteNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
- RecordPartition partition = position.getPartition();
- try {
- if (null != partition && null != position) {
- Map<String, String> offsetMap = (Map<String, String>) offset.getOffset();
- offsetMap.put(RuntimeConfigDefine.UPDATE_TIMESTAMP, String.valueOf(sourceDataEntry.getTimestamp()));
- positionStorageWriter.putPosition(partition, offset);
- }
-
- } catch (Exception e) {
- log.error("Source task save position info failed. partition {}, offset {}", JSON.toJSONString(partition), JSON.toJSONString(offset), e);
- }
+ // metrics
+ incWriteRecordStat();
+ // commit record for custom
+ recordSent(preTransformRecord, sourceMessage, result);
+ // ack record position
+ submittedRecordPosition.ifPresent(RecordOffsetManagement.SubmittedPosition::ack);
}
@Override
public void onException(Throwable throwable) {
log.error("Source task send record failed ,error msg {}. message {}", throwable.getMessage(), JSON.toJSONString(sourceMessage), throwable);
- connectStatsManager.incSourceRecordWriteTotalFailNums();
- connectStatsManager.incSourceRecordWriteFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
+ // fail record metrics
+ inWriteRecordFail();
+ // record send failed
+ recordSendFailed(false, sourceMessage, preTransformRecord, throwable);
}
});
- } catch (MQClientException e) {
+ } catch (RetriableException e) {
+ log.warn("{} Failed to send record to topic '{}'. Backing off before retrying: ",
+ this, sourceMessage.getTopic(), e);
+ // Intercepted as successfully sent, used to continue sending next time
+ toSendRecord = toSendRecord.subList(processed, toSendRecord.size());
+ // remove pre submit position, for retry
+ submittedRecordPosition.ifPresent(RecordOffsetManagement.SubmittedPosition::remove);
+ return false;
+ } catch (MQClientException | RemotingException e) {
log.error("Send message MQClientException. message: {}, error info: {}.", sourceMessage, e);
- connectStatsManager.incSourceRecordWriteTotalFailNums();
- connectStatsManager.incSourceRecordWriteFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
- } catch (RemotingException e) {
- log.error("Send message RemotingException. message: {}, error info: {}.", sourceMessage, e);
- connectStatsManager.incSourceRecordWriteTotalFailNums();
- connectStatsManager.incSourceRecordWriteFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
+ inWriteRecordFail();
+ recordSendFailed(true, sourceMessage, preTransformRecord, e);
} catch (InterruptedException e) {
log.error("Send message InterruptedException. message: {}, error info: {}.", sourceMessage, e);
- connectStatsManager.incSourceRecordWriteTotalFailNums();
- connectStatsManager.incSourceRecordWriteFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
+ inWriteRecordFail();
throw e;
}
+ processed++;
}
toSendRecord = null;
+ return true;
+ }
+
+ private void prepareToPollTask() {
+ maybeThrowProducerSendException();
+ }
+
+ private void maybeThrowProducerSendException() {
+ if (producerSendException.get() != null) {
+ throw new ConnectException(
+ "Unrecoverable exception from producer send callback",
+ producerSendException.get()
+ );
+ }
+ }
+
+ private void recordSendFailed(
+ boolean synchronous,
+ Message sourceMessage,
+ ConnectRecord preTransformRecord,
+ Throwable e) {
+ if (synchronous) {
+ throw new ConnectException("Unrecoverable exception trying to send", e);
+ }
+ String topic = sourceMessage.getTopic();
+ if (retryWithToleranceOperator.getErrorToleranceType() == ToleranceType.ALL) {
+ // ignore all error
+ log.trace(
+ "Ignoring failed record send: {} failed to send record to {}: ",
+ WorkerSourceTask.this,
+ topic,
+ e
+ );
+ retryWithToleranceOperator.executeFailed(
+ ErrorReporter.Stage.ROCKETMQ_PRODUCE,
+ WorkerSourceTask.class,
+ preTransformRecord,
+ e);
+ commitTaskRecord(preTransformRecord, null);
+ } else {
+ log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
+ log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
+ producerSendException.compareAndSet(null, e);
+ }
+ }
+
+
+ /**
+ * failed send
+ *
+ * @param record
+ */
+ private void recordFailed(ConnectRecord record) {
+ commitTaskRecord(record, null);
+ }
+
+
+ /**
+ * send success record
+ *
+ * @param preTransformRecord
+ * @param sourceMessage
+ * @param result
+ */
+ private void recordSent(
+ ConnectRecord preTransformRecord,
+ Message sourceMessage,
+ SendResult result) {
+ commitTaskRecord(preTransformRecord, result);
+ }
+
+ private void commitTaskRecord(ConnectRecord preTransformRecord, SendResult result) {
+ ConnectKeyValue keyValue = null;
+ if (result != null) {
+ keyValue = new ConnectKeyValue();
+ keyValue.put("send.status", result.getSendStatus().name());
+ keyValue.put("msg.id", result.getMsgId());
+ keyValue.put("topic", result.getMessageQueue().getTopic());
+ keyValue.put("broker.name", result.getMessageQueue().getBrokerName());
+ keyValue.put("queue.id", result.getMessageQueue().getQueueId());
+ keyValue.put("queue.offset", result.getQueueOffset());
+ keyValue.put("transaction.id", result.getTransactionId());
+ keyValue.put("offset.msg.id", result.getOffsetMsgId());
+ keyValue.put("region.id", result.getRegionId());
+ }
+ sourceTask.commit(preTransformRecord, keyValue == null ? null : keyValue.getProperties());
+ }
+
+
+ /**
+ * Convert the source record into a producer record.
+ */
+ protected Message convertTransformedRecord(final String topic, ConnectRecord record) {
+ if (record == null) {
+ return null;
+ }
+ Message sourceMessage = new Message();
+ sourceMessage.setTopic(topic);
+ // converter
+ if (recordConverter == null) {
+ final byte[] messageBody = JSON.toJSONString(record, SerializerFeature.DisableCircularReferenceDetect, SerializerFeature.WriteMapNullValue).getBytes();
+ if (messageBody.length > RuntimeConfigDefine.MAX_MESSAGE_SIZE) {
+ log.error("Send record, message size is greater than {} bytes, record: {}", RuntimeConfigDefine.MAX_MESSAGE_SIZE, JSON.toJSONString(record));
+ }
+ sourceMessage.setBody(messageBody);
+ } else {
+ byte[] messageBody = retryWithToleranceOperator.execute(() -> recordConverter.fromConnectData(topic, record.getSchema(), record.getData()),
+ ErrorReporter.Stage.CONVERTER, recordConverter.getClass());
+ if (messageBody.length > RuntimeConfigDefine.MAX_MESSAGE_SIZE) {
+ log.error("Send record, message size is greater than {} bytes, record: {}", RuntimeConfigDefine.MAX_MESSAGE_SIZE, JSON.toJSONString(record));
+ }
+ sourceMessage.setBody(messageBody);
+ }
+
+ if (retryWithToleranceOperator.failed()) {
+ return null;
+ }
+ // put extend msg property
+ putExtendMsgProperty(record, sourceMessage, topic);
+ return sourceMessage;
+ }
+
+ /**
+ * maybe create and get topic
+ *
+ * @param record
+ * @return
+ */
+ private String maybeCreateAndGetTopic(ConnectRecord record) {
+ String topic = taskConfig.getString(RuntimeConfigDefine.CONNECT_TOPICNAME);
+ if (StringUtils.isBlank(topic)) {
+ RecordPosition recordPosition = record.getPosition();
+ if (null == recordPosition) {
+ log.error("connect-topicname config is null and recordPosition is null , lack of topic config");
+ }
+ RecordPartition partition = recordPosition.getPartition();
+ if (null == partition) {
+ log.error("connect-topicname config is null and partition is null , lack of topic config");
+ }
+ Map<String, ?> partitionMap = partition.getPartition();
+ if (null == partitionMap) {
+ log.error("connect-topicname config is null and partitionMap is null , lack of topic config");
+ }
+ Object o = partitionMap.get(TOPIC);
+ if (null == o) {
+ log.error("connect-topicname config is null and partitionMap.get is null , lack of topic config");
+ }
+ topic = (String) o;
+ }
+ if (StringUtils.isBlank(topic)) {
+ throw new ConnectException("source connect lack of topic config");
+ }
+ if (ConnectUtil.isTopicExist(workerConfig, topic)) {
+ ConnectUtil.createTopic(workerConfig, new TopicConfig(topic));
+ }
+ return topic;
}
private void putExtendMsgProperty(ConnectRecord sourceDataEntry, Message sourceMessage, String topic) {
@@ -399,7 +427,6 @@ public class WorkerSourceTask implements WorkerTask {
log.info("extension keySet null.");
return;
}
-
for (String key : keySet) {
if (WHITE_KEY_SET.contains(key)) {
MessageAccessor.putProperty(sourceMessage, key, extensionKeyValues.getString(key));
@@ -410,42 +437,182 @@ public class WorkerSourceTask implements WorkerTask {
}
+ /**
+ * initinalize and start
+ */
@Override
- public WorkerTaskState getState() {
- return this.state.get();
+ protected void initializeAndStart() {
+ try {
+ producer.start();
+ } catch (MQClientException e) {
+ log.error("{} Source task producer start failed!!", this);
+ throw new ConnectException(e);
+ }
+ sourceTask.init(sourceTaskContext);
+ sourceTask.start(taskConfig);
+ log.info("{} Source task finished initialization and start", this);
}
- @Override
- public String getConnectorName() {
- return connectorName;
+
+ protected void recordPollReturned(int numRecordsInBatch) {
+ connectStatsManager.incSourceRecordPollTotalNums(numRecordsInBatch);
+ connectStatsManager.incSourceRecordPollNums(id().toString()+"", numRecordsInBatch);
}
+ /**
+ * execute poll and send record
+ */
@Override
- public ConnectKeyValue getTaskConfig() {
- return taskConfig;
+ protected void execute() {
+ while (isRunning()) {
+
+ updateCommittableOffsets();
+ if (CollectionUtils.isEmpty(toSendRecord)) {
+ try {
+ prepareToPollTask();
+ toSendRecord = poll();
+ if (null != toSendRecord && toSendRecord.size() > 0) {
+ recordPollReturned(toSendRecord.size());
+ }
+ if (toSendRecord == null) {
+ continue;
+ }
+ log.trace("{} About to send {} records to RocketMQ", this, toSendRecord.size());
+ if (!sendRecord()) {
+ stopRequestedLatch.await(SEND_FAILED_BACKOFF_MS, TimeUnit.MILLISECONDS);
+ }
+ } catch (InterruptedException e) {
+ // Ignore and allow to exit.
+ } catch (Exception e) {
+ try {
+ finalOffsetCommit(true);
+ } catch (Exception offsetException) {
+ log.error("Failed to commit offsets for already-failing task", offsetException);
+ }
+ throw e;
+ } finally {
+ finalOffsetCommit(false);
+ // record source poll times
+ connectStatsManager.incSourceRecordPollTotalTimes();
+ }
+ }
+ AtomicLong atomicLong = connectStatsService.singleSourceTaskTimesTotal(id().toString());
+ if (null != atomicLong) {
+ atomicLong.addAndGet(toSendRecord == null ? 0 : toSendRecord.size());
+ }
+ }
}
- @Override
- public void timeout() {
- this.state.set(WorkerTaskState.ERROR);
+ protected void finalOffsetCommit(boolean b) {
+
+ offsetManagement.awaitAllMessages(
+ workerConfig.getOffsetCommitTimeoutMsConfig(),
+ TimeUnit.MILLISECONDS
+ );
+ updateCommittableOffsets();
+ commitOffsets();
}
- @Override
- public String toString() {
+ public boolean commitOffsets() {
+ long commitTimeoutMs = workerConfig.getOffsetCommitTimeoutMsConfig();
+ log.debug("{} Committing offsets", this);
+
+ long started = System.currentTimeMillis();
+ long timeout = started + commitTimeoutMs;
+
+ RecordOffsetManagement.CommittableOffsets offsetsToCommit;
+ synchronized (this) {
+ offsetsToCommit = this.committableOffsets;
+ this.committableOffsets = RecordOffsetManagement.CommittableOffsets.EMPTY;
+ }
+
+ if (committableOffsets.isEmpty()) {
+ log.debug("{} Either no records were produced by the task since the last offset commit, "
+ + "or every record has been filtered out by a transformation "
+ + "or dropped due to transformation or conversion errors.",
+ this
+ );
+ // We continue with the offset commit process here instead of simply returning immediately
+ // in order to invoke SourceTask::commit and record metrics for a successful offset commit
+ } else {
+ log.info("{} Committing offsets for {} acknowledged messages", this, committableOffsets.numCommittableMessages());
+ if (committableOffsets.hasPending()) {
+ log.debug("{} There are currently {} pending messages spread across {} source partitions whose offsets will not be committed. "
+ + "The source partition with the most pending messages is {}, with {} pending messages",
+ this,
+ committableOffsets.numUncommittableMessages(),
+ committableOffsets.numDeques(),
+ committableOffsets.largestDequePartition(),
+ committableOffsets.largestDequeSize()
+ );
+ } else {
+ log.debug("{} There are currently no pending messages for this offset commit; "
+ + "all messages dispatched to the task's producer since the last commit have been acknowledged",
+ this
+ );
+ }
+ }
+
+ // write offset
+ offsetsToCommit.offsets().forEach(positionStorageWriter::writeOffset);
+
+ // begin flush
+ if (!positionStorageWriter.beginFlush()) {
+ // There was nothing in the offsets to process, but we still mark a successful offset commit.
+ long durationMillis = System.currentTimeMillis() - started;
+ log.debug("{} Finished offset commitOffsets successfully in {} ms",
+ this, durationMillis);
+ commitSourceTask();
+ return true;
+ }
+
+ Future<Void> flushFuture = positionStorageWriter.doFlush((error, key, result) -> {
+ if (error != null) {
+ log.error("{} Failed to flush offsets to storage: ", WorkerSourceTask.this, error);
+ } else {
+ log.trace("{} Finished flushing offsets to storage", WorkerSourceTask.this);
+ }
+ });
+ try {
+ flushFuture.get(Math.max(timeout - System.currentTimeMillis(), 0), TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ log.warn("{} Flush of offsets interrupted, cancelling", this);
+ positionStorageWriter.cancelFlush();
+ return false;
+ } catch (ExecutionException e) {
+ log.error("{} Flush of offsets threw an unexpected exception: ", this, e);
+ positionStorageWriter.cancelFlush();
+ return false;
+ } catch (TimeoutException e) {
+ log.error("{} Timed out waiting to flush offsets to storage; will try again on next flush interval with latest offsets", this);
+ positionStorageWriter.cancelFlush();
+ return false;
+ }
- StringBuilder sb = new StringBuilder();
- sb.append("connectorName:" + connectorName)
- .append("\nConfigs:" + JSON.toJSONString(taskConfig))
- .append("\nState:" + state.get().toString());
- return sb.toString();
+ long durationMillis = System.currentTimeMillis() - started;
+ log.debug("{} Finished commitOffsets successfully in {} ms",
+ this, durationMillis);
+ commitSourceTask();
+ return true;
}
- @Override
- public Object getJsonObject() {
- HashMap obj = new HashMap<String, Object>();
- obj.put("connectorName", connectorName);
- obj.put("configs", JSON.toJSONString(taskConfig));
- obj.put("state", state.get().toString());
- return obj;
+ protected void commitSourceTask() {
+ try {
+ this.sourceTask.commit();
+ } catch (Throwable t) {
+ log.error("{} Exception thrown while calling task.commit()", this, t);
+ }
}
+
+
+ private void inWriteRecordFail() {
+ connectStatsManager.incSourceRecordWriteTotalFailNums();
+ connectStatsManager.incSourceRecordWriteFailNums(id().toString());
+ }
+
+ private void incWriteRecordStat() {
+ connectStatsManager.incSourceRecordWriteTotalNums();
+ connectStatsManager.incSourceRecordWriteNums(id().toString());
+ }
+
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTaskContext.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTaskContext.java
new file mode 100644
index 0000000..5ab6504
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTaskContext.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.runtime.connectorwrapper;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
+import io.openmessaging.connector.api.storage.OffsetStorageReader;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+
+/**
+ * worker source task context
+ */
+public class WorkerSourceTaskContext implements SourceTaskContext {
+
+ private final OffsetStorageReader reader;
+ private final WorkerSourceTask task;
+ private final ConnectKeyValue keyValue;
+
+ public WorkerSourceTaskContext(OffsetStorageReader reader,
+ WorkerSourceTask task,
+ ConnectKeyValue config) {
+ this.reader = reader;
+ this.task = task;
+ this.keyValue = config;
+ }
+
+ @Override
+ public KeyValue configs() {
+ return this.keyValue;
+ }
+
+ @Override
+ public OffsetStorageReader offsetStorageReader() {
+ return reader;
+ }
+
+ @Override
+ public String getConnectorName() {
+ return task.id().connector();
+ }
+
+ @Override
+ public String getTaskName() {
+ return task.id().task() + "";
+ }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTask.java
index 52bd26d..9ffd1c2 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTask.java
@@ -15,24 +15,192 @@
* limitations under the License.
*/
package org.apache.rocketmq.connect.runtime.connectorwrapper;
+
+import io.openmessaging.connector.api.data.ConnectRecord;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
+import org.apache.rocketmq.connect.runtime.utils.CurrentTaskState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicReference;
/**
* Should we use callable here ?
*/
-public interface WorkerTask extends Runnable {
+public abstract class WorkerTask implements Runnable {
+
+ private static final Logger log = LoggerFactory.getLogger(WorkerTask.class);
+ private static final String THREAD_NAME_PREFIX = "task-thread-";
+
+ protected final ConnectConfig workerConfig;
+
+ protected final ConnectorTaskId id;
+ protected final ClassLoader loader;
+ protected final ConnectKeyValue taskConfig;
+ /**
+ * Atomic state variable
+ */
+ protected AtomicReference<WorkerTaskState> state;
+ /**
+ * worker state
+ */
+ protected final AtomicReference<WorkerState> workerState;
+ protected final RetryWithToleranceOperator retryWithToleranceOperator;
+ protected final TransformChain<ConnectRecord> transformChain;
+
+
+ public WorkerTask(ConnectConfig workerConfig, ConnectorTaskId id, ClassLoader loader, ConnectKeyValue taskConfig, RetryWithToleranceOperator retryWithToleranceOperator, TransformChain<ConnectRecord> transformChain, AtomicReference<WorkerState> workerState) {
+ this.workerConfig = workerConfig;
+ this.id = id;
+ this.loader = loader;
+ this.taskConfig = taskConfig;
+ this.state = new AtomicReference<>(WorkerTaskState.NEW);
+ this.workerState = workerState;
+ this.retryWithToleranceOperator = retryWithToleranceOperator;
+ this.transformChain = transformChain;
+ this.transformChain.retryWithToleranceOperator(this.retryWithToleranceOperator);
+ }
+
+ public ConnectorTaskId id() {
+ return id;
+ }
+
+ public ClassLoader loader() {
+ return loader;
+ }
+
+ /**
+ * Initialize the task for execution.
+ *
+ * @param taskConfig initial configuration
+ */
+ protected void initialize(ConnectKeyValue taskConfig) {
+ // NO-op
+ }
+
+ /**
+ * initinalize and start
+ */
+ protected abstract void initializeAndStart();
+
+ public void doInitializeAndStart() {
+ state.compareAndSet(WorkerTaskState.NEW, WorkerTaskState.PENDING);
+ initializeAndStart();
+ state.compareAndSet(WorkerTaskState.PENDING, WorkerTaskState.RUNNING);
+ }
+
+ /**
+ * execute poll and send record
+ */
+ protected abstract void execute();
+
+ private void doExecute() {
+ execute();
+ }
+
+ /**
+ * get state
+ *
+ * @return
+ */
+ public WorkerTaskState getState() {
+ return this.state.get();
+ }
+
+ protected boolean isRunning() {
+ return WorkerState.STARTED == workerState.get() && WorkerTaskState.RUNNING == state.get();
+ }
+
+ protected boolean isStopping() {
+ return !isRunning();
+ }
+
+ /**
+ * close resources
+ */
+ protected abstract void close();
+
+ public void doClose() {
+ try {
+ state.compareAndSet(WorkerTaskState.RUNNING, WorkerTaskState.STOPPING);
+ close();
+ state.compareAndSet(WorkerTaskState.STOPPING, WorkerTaskState.STOPPED);
+ } catch (Throwable t) {
+ log.error("{} Task threw an uncaught and unrecoverable exception during shutdown", this, t);
+ throw t;
+ }
+ }
+
+ /**
+ * clean up
+ */
+ public void cleanup() {
+ log.info("Cleaning a task, current state {}, destination state {}", state.get().name(), WorkerTaskState.TERMINATED.name());
+ if (state.compareAndSet(WorkerTaskState.STOPPED, WorkerTaskState.TERMINATED)
+ || state.compareAndSet(WorkerTaskState.ERROR, WorkerTaskState.TERMINATED)) {
+ log.info("Cleaning a task success");
+ } else {
+ log.error("[BUG] cleaning a task but it's not in STOPPED or ERROR state");
+ }
+ }
+
+
+ public ConnectKeyValue currentTaskConfig() {
+ return taskConfig;
+ }
- public WorkerTaskState getState();
+ /**
+ * current task state
+ *
+ * @return
+ */
+ public CurrentTaskState currentTaskState() {
+ return new CurrentTaskState(id().connector(), taskConfig, state.get());
+ }
- public void stop();
- public void cleanup();
+ private void doRun() throws InterruptedException {
+ try {
+ doInitializeAndStart();
+ // while poll
+ doExecute();
+ } catch (Throwable t) {
+ log.error("{} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted", this, t);
+ throw t;
+ } finally {
+ doClose();
+ }
+ }
- public String getConnectorName();
+ /**
+ * do execute data
+ */
+ @Override
+ public void run() {
+ String savedName = Thread.currentThread().getName();
+ try {
+ Thread.currentThread().setName(THREAD_NAME_PREFIX + id);
+ doRun();
+ } catch (Throwable t) {
+ onFailure(t);
+ throw (Error) t;
+ } finally {
+ Thread.currentThread().setName(savedName);
+ }
+ }
- public ConnectKeyValue getTaskConfig();
+ public void onFailure(Throwable t) {
+ synchronized (this) {
+ state.set(WorkerTaskState.ERROR);
+ }
+ }
- public Object getJsonObject();
+ public void timeout() {
+ log.error("Worker task stop is timeout !!!");
+ onFailure(null);
+ }
- public void timeout();
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ErrorReporter.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ErrorReporter.java
index 2125fc6..89cd15a 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ErrorReporter.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ErrorReporter.java
@@ -48,6 +48,12 @@ public interface ErrorReporter extends AutoCloseable {
*/
TASK_POLL,
+ /**
+ * rocketmq produce
+ */
+ ROCKETMQ_PRODUCE,
+
+
/**
* sink put
*/
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/RetryWithToleranceOperator.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/RetryWithToleranceOperator.java
index b0fa283..e331232 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/RetryWithToleranceOperator.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/RetryWithToleranceOperator.java
@@ -75,6 +75,21 @@ public class RetryWithToleranceOperator implements AutoCloseable {
}
}
+ public synchronized void executeFailed(ErrorReporter.Stage stage,
+ Class<?> executingClass,
+ ConnectRecord sourceRecord,
+ Throwable error) {
+ markAsFailed();
+ context.sourceRecord(sourceRecord);
+ context.currentContext(stage, executingClass);
+ context.error(error);
+ context.report();
+ if (!withinToleranceLimits()) {
+ throw new ConnectException("Tolerance exceeded in Source Worker error handler", error);
+ }
+ }
+
+
/**
* Execute the recoverable operation. If the operation is already in a failed state, then simply return
* with the existing failure.
@@ -232,6 +247,9 @@ public class RetryWithToleranceOperator implements AutoCloseable {
return this.context.error();
}
+ public ToleranceType getErrorToleranceType() {
+ return toleranceType;
+ }
@Override
public void close() {
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
index 58e4e7e..1eda51f 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
@@ -239,7 +239,7 @@ public class RestHandler {
private Set<Object> convertWorkerTaskToString(Set<Runnable> tasks) {
Set<Object> result = new HashSet<>();
for (Runnable task : tasks) {
- result.add(((WorkerTask) task).getJsonObject());
+ result.add(((WorkerTask) task).currentTaskState());
}
return result;
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractConfigManagementService.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractConfigManagementService.java
new file mode 100644
index 0000000..45f641c
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractConfigManagementService.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.service;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.connector.Connector;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.Worker;
+import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Interface for config manager. Contains connector configs and task configs. All worker in a cluster should keep the
+ * same configs.
+ */
+public abstract class AbstractConfigManagementService implements ConfigManagementService {
+ /**
+ * Current connector configs in the store.
+ */
+ protected KeyValueStore<String, ConnectKeyValue> connectorKeyValueStore;
+
+ @Override
+ public void recomputeTaskConfigs(String connectorName, Connector connector, Long currentTimestamp, ConnectKeyValue configs) {
+ int maxTask = configs.getInt(RuntimeConfigDefine.MAX_TASK, 1);
+ ConnectKeyValue connectConfig = connectorKeyValueStore.get(connectorName);
+ boolean directEnable = Boolean.parseBoolean(connectConfig.getString(RuntimeConfigDefine.CONNECTOR_DIRECT_ENABLE));
+ List<KeyValue> taskConfigs = connector.taskConfigs(maxTask);
+ List<ConnectKeyValue> converterdConfigs = new ArrayList<>();
+ int taskId = 0;
+ for (KeyValue keyValue : taskConfigs) {
+ ConnectKeyValue newKeyValue = new ConnectKeyValue();
+ for (String key : keyValue.keySet()) {
+ newKeyValue.put(key, keyValue.getString(key));
+ }
+ if (directEnable) {
+ newKeyValue.put(RuntimeConfigDefine.TASK_TYPE, Worker.TaskType.DIRECT.name());
+ newKeyValue.put(RuntimeConfigDefine.SOURCE_TASK_CLASS, connectConfig.getString(RuntimeConfigDefine.SOURCE_TASK_CLASS));
+ newKeyValue.put(RuntimeConfigDefine.SINK_TASK_CLASS, connectConfig.getString(RuntimeConfigDefine.SINK_TASK_CLASS));
+ }
+ // put task id
+ newKeyValue.put(RuntimeConfigDefine.TASK_ID, taskId);
+ newKeyValue.put(RuntimeConfigDefine.TASK_CLASS, connector.taskClass().getName());
+ newKeyValue.put(RuntimeConfigDefine.UPDATE_TIMESTAMP, currentTimestamp);
+
+ newKeyValue.put(RuntimeConfigDefine.CONNECT_TOPICNAME, configs.getString(RuntimeConfigDefine.CONNECT_TOPICNAME));
+ newKeyValue.put(RuntimeConfigDefine.CONNECT_TOPICNAMES, configs.getString(RuntimeConfigDefine.CONNECT_TOPICNAMES));
+ Set<String> connectConfigKeySet = configs.keySet();
+ for (String connectConfigKey : connectConfigKeySet) {
+ if (connectConfigKey.startsWith(RuntimeConfigDefine.TRANSFORMS)) {
+ newKeyValue.put(connectConfigKey, configs.getString(connectConfigKey));
+ }
+ }
+ converterdConfigs.add(newKeyValue);
+ taskId++;
+ }
+ putTaskConfigs(connectorName, converterdConfigs);
+ }
+
+ protected abstract void putTaskConfigs(String connectorName, List<ConnectKeyValue> configs);
+
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
index 0ee1655..1461f5e 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
@@ -46,13 +46,13 @@ import org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizerCallba
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ConfigManagementServiceImpl implements ConfigManagementService {
+public class ConfigManagementServiceImpl extends AbstractConfigManagementService {
private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
- /**
- * Current connector configs in the store.
- */
- private KeyValueStore<String, ConnectKeyValue> connectorKeyValueStore;
+// /**
+// * Current connector configs in the store.
+// */
+// private KeyValueStore<String, ConnectKeyValue> connectorKeyValueStore;
/**
* Current task configs in the store.
@@ -196,35 +196,7 @@ public class ConfigManagementServiceImpl implements ConfigManagementService {
@Override
public void recomputeTaskConfigs(String connectorName, Connector connector, Long currentTimestamp, ConnectKeyValue configs) {
- int maxTask = configs.getInt(RuntimeConfigDefine.MAX_TASK, 1);
- ConnectKeyValue connectConfig = connectorKeyValueStore.get(connectorName);
- boolean directEnable = Boolean.parseBoolean(connectConfig.getString(RuntimeConfigDefine.CONNECTOR_DIRECT_ENABLE));
- List<KeyValue> taskConfigs = connector.taskConfigs(maxTask);
- List<ConnectKeyValue> converterdConfigs = new ArrayList<>();
- for (KeyValue keyValue : taskConfigs) {
- ConnectKeyValue newKeyValue = new ConnectKeyValue();
- for (String key : keyValue.keySet()) {
- newKeyValue.put(key, keyValue.getString(key));
- }
- if (directEnable) {
- newKeyValue.put(RuntimeConfigDefine.TASK_TYPE, Worker.TaskType.DIRECT.name());
- newKeyValue.put(RuntimeConfigDefine.SOURCE_TASK_CLASS, connectConfig.getString(RuntimeConfigDefine.SOURCE_TASK_CLASS));
- newKeyValue.put(RuntimeConfigDefine.SINK_TASK_CLASS, connectConfig.getString(RuntimeConfigDefine.SINK_TASK_CLASS));
- }
- newKeyValue.put(RuntimeConfigDefine.TASK_CLASS, connector.taskClass().getName());
- newKeyValue.put(RuntimeConfigDefine.UPDATE_TIMESTAMP, currentTimestamp);
-
- newKeyValue.put(RuntimeConfigDefine.CONNECT_TOPICNAME, configs.getString(RuntimeConfigDefine.CONNECT_TOPICNAME));
- newKeyValue.put(RuntimeConfigDefine.CONNECT_TOPICNAMES, configs.getString(RuntimeConfigDefine.CONNECT_TOPICNAMES));
- Set<String> connectConfigKeySet = configs.keySet();
- for (String connectConfigKey : connectConfigKeySet) {
- if (connectConfigKey.startsWith(RuntimeConfigDefine.TRANSFORMS)) {
- newKeyValue.put(connectConfigKey, configs.getString(connectConfigKey));
- }
- }
- converterdConfigs.add(newKeyValue);
- }
- putTaskConfigs(connectorName, converterdConfigs);
+ super.recomputeTaskConfigs(connectorName, connector, currentTimestamp, configs);
sendSynchronizeConfig();
triggerListener();
}
@@ -260,7 +232,8 @@ public class ConfigManagementServiceImpl implements ConfigManagementService {
return result;
}
- private void putTaskConfigs(String connectorName, List<ConnectKeyValue> configs) {
+ @Override
+ protected void putTaskConfigs(String connectorName, List<ConnectKeyValue> configs) {
List<ConnectKeyValue> exist = taskKeyValueStore.get(connectorName);
if (null != exist && exist.size() > 0) {
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
index 11e35e5..c372d68 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
@@ -127,7 +127,6 @@ public class PositionManagementServiceImpl implements PositionManagementService
@Override
public RecordOffset getPosition(ExtendRecordPartition partition) {
-
return positionStore.get(partition);
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
index 0668c37..fa5425b 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
@@ -23,8 +23,7 @@ import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
-import org.apache.rocketmq.connect.runtime.connectorwrapper.Worker;
-import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
+import org.apache.rocketmq.connect.runtime.service.AbstractConfigManagementService;
import org.apache.rocketmq.connect.runtime.service.StagingMode;
import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
import org.apache.rocketmq.connect.runtime.store.MemoryBasedKeyValueStore;
@@ -40,14 +39,9 @@ import java.util.Map;
/**
* memory config management service impl for standalone
*/
-public class MemoryConfigManagementServiceImpl implements ConfigManagementService {
+public class MemoryConfigManagementServiceImpl extends AbstractConfigManagementService {
private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
- /**
- * Current connector configs in the store.
- */
- private KeyValueStore<String, ConnectKeyValue> connectorKeyValueStore;
-
/**
* Current task configs in the store.
*/
@@ -63,7 +57,8 @@ public class MemoryConfigManagementServiceImpl implements ConfigManagementServic
public MemoryConfigManagementServiceImpl() {
}
- @Override public void initialize(ConnectConfig connectConfig, Plugin plugin) {
+ @Override
+ public void initialize(ConnectConfig connectConfig, Plugin plugin) {
this.connectorKeyValueStore = new MemoryBasedKeyValueStore<>();
this.taskKeyValueStore = new MemoryBasedKeyValueStore<>();
this.plugin = plugin;
@@ -155,35 +150,7 @@ public class MemoryConfigManagementServiceImpl implements ConfigManagementServic
@Override
public void recomputeTaskConfigs(String connectorName, Connector connector, Long currentTimestamp, ConnectKeyValue configs) {
- int maxTask = configs.getInt(RuntimeConfigDefine.MAX_TASK, 1);
- ConnectKeyValue connectConfig = connectorKeyValueStore.get(connectorName);
- boolean directEnable = Boolean.parseBoolean(connectConfig.getString(RuntimeConfigDefine.CONNECTOR_DIRECT_ENABLE));
- List<KeyValue> taskConfigs = connector.taskConfigs(maxTask);
- List<ConnectKeyValue> converterdConfigs = new ArrayList<>();
- for (KeyValue keyValue : taskConfigs) {
- ConnectKeyValue newKeyValue = new ConnectKeyValue();
- for (String key : keyValue.keySet()) {
- newKeyValue.put(key, keyValue.getString(key));
- }
- if (directEnable) {
- newKeyValue.put(RuntimeConfigDefine.TASK_TYPE, Worker.TaskType.DIRECT.name());
- newKeyValue.put(RuntimeConfigDefine.SOURCE_TASK_CLASS, connectConfig.getString(RuntimeConfigDefine.SOURCE_TASK_CLASS));
- newKeyValue.put(RuntimeConfigDefine.SINK_TASK_CLASS, connectConfig.getString(RuntimeConfigDefine.SINK_TASK_CLASS));
- }
- newKeyValue.put(RuntimeConfigDefine.TASK_CLASS, connector.taskClass().getName());
- newKeyValue.put(RuntimeConfigDefine.UPDATE_TIMESTAMP, currentTimestamp);
-
- newKeyValue.put(RuntimeConfigDefine.CONNECT_TOPICNAME, configs.getString(RuntimeConfigDefine.CONNECT_TOPICNAME));
- newKeyValue.put(RuntimeConfigDefine.CONNECT_TOPICNAMES, configs.getString(RuntimeConfigDefine.CONNECT_TOPICNAMES));
- Set<String> connectConfigKeySet = configs.keySet();
- for (String connectConfigKey : connectConfigKeySet) {
- if (connectConfigKey.startsWith(RuntimeConfigDefine.TRANSFORMS)) {
- newKeyValue.put(connectConfigKey, configs.getString(connectConfigKey));
- }
- }
- converterdConfigs.add(newKeyValue);
- }
- putTaskConfigs(connectorName, converterdConfigs);
+ super.recomputeTaskConfigs(connectorName, connector, currentTimestamp, configs);
triggerListener();
}
@@ -214,7 +181,8 @@ public class MemoryConfigManagementServiceImpl implements ConfigManagementServic
return result;
}
- private void putTaskConfigs(String connectorName, List<ConnectKeyValue> configs) {
+ @Override
+ protected void putTaskConfigs(String connectorName, List<ConnectKeyValue> configs) {
List<ConnectKeyValue> exist = taskKeyValueStore.get(connectorName);
if (null != exist && exist.size() > 0) {
taskKeyValueStore.remove(connectorName);
@@ -245,7 +213,8 @@ public class MemoryConfigManagementServiceImpl implements ConfigManagementServic
return this.plugin;
}
- @Override public StagingMode getStagingMode() {
+ @Override
+ public StagingMode getStagingMode() {
return StagingMode.STANDALONE;
}
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/PositionStorageWriter.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/PositionStorageWriter.java
index c5fa1fa..5b91217 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/PositionStorageWriter.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/PositionStorageWriter.java
@@ -21,23 +21,173 @@ package org.apache.rocketmq.connect.runtime.store;
import io.openmessaging.connector.api.data.RecordOffset;
import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.errors.ConnectException;
+import io.openmessaging.connector.api.storage.OffsetStorageWriter;
+import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizerCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
/**
* position storage writer
*/
-public class PositionStorageWriter {
+public class PositionStorageWriter implements OffsetStorageWriter, Closeable {
+
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
private PositionManagementService positionManagementService;
private final String namespace;
+ private final ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+ /**
+ * Offset data in Connect format
+ */
+ private Map<ExtendRecordPartition, RecordOffset> data = new HashMap<>();
+ private Map<ExtendRecordPartition, RecordOffset> toFlush = null;
+
+ // Unique ID for each flush request to handle callbacks after timeouts
+ private long currentFlushId = 0;
+
public PositionStorageWriter(String namespace, PositionManagementService positionManagementService) {
this.namespace = namespace;
this.positionManagementService = positionManagementService;
}
- public void putPosition(RecordPartition partition, RecordOffset position) {
+ @Override
+ public void writeOffset(RecordPartition partition, RecordOffset position) {
ExtendRecordPartition extendRecordPartition = new ExtendRecordPartition(namespace, partition.getPartition());
- positionManagementService.putPosition(extendRecordPartition, position);
+ data.put(extendRecordPartition, position);
+ }
+
+ /**
+ * write offsets
+ *
+ * @param positions positions
+ */
+ @Override
+ public void writeOffset(Map<RecordPartition, RecordOffset> positions) {
+ for (Map.Entry<RecordPartition, RecordOffset> offset : positions.entrySet()) {
+ writeOffset(offset.getKey(), offset.getValue());
+ }
+ }
+
+
+ private boolean isFlushing() {
+ return toFlush != null;
+ }
+
+ /**
+ * begin flush offset
+ *
+ * @return
+ */
+ public synchronized boolean beginFlush() {
+ if (isFlushing()) {
+ throw new ConnectException("PositionStorageWriter is already flushing");
+ }
+ if (data.isEmpty()) {
+ return false;
+ }
+ this.toFlush = this.data;
+ this.data = new HashMap<>();
+ return true;
+ }
+
+ /**
+ * do flush offset
+ *
+ * @param callback
+ */
+ public Future doFlush(final DataSynchronizerCallback callback) {
+ final long flushId = currentFlushId;
+ return sendOffsetFuture(callback, flushId);
+ }
+
+ /**
+ * Cancel a flush that has been initiated by {@link #beginFlush}.
+ */
+ public synchronized void cancelFlush() {
+ if (isFlushing()) {
+ // rollback to inited
+ toFlush.putAll(data);
+ data = toFlush;
+ currentFlushId++;
+ toFlush = null;
+ }
+ }
+
+ private Future<Void> sendOffsetFuture(DataSynchronizerCallback callback, long flushId) {
+ FutureTask<Void> futureTask = new FutureTask<Void>(new SendOffsetCallback(callback, flushId));
+ executorService.submit(futureTask);
+ return futureTask;
+ }
+
+ /**
+ * Closes this stream and releases any system resources associated
+ * with it. If the stream is already closed then invoking this
+ * method has no effect.
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ @Override
+ public void close() throws IOException {
+ if (executorService != null) {
+ executorService.shutdown();
+ }
}
+
+
+ /**
+ * send offset callback
+ */
+ private class SendOffsetCallback implements Callable<Void> {
+ DataSynchronizerCallback callback;
+ long flushId;
+
+ public SendOffsetCallback(DataSynchronizerCallback callback, long flushId) {
+ this.callback = callback;
+ this.flushId = flushId;
+ }
+
+ /**
+ * Computes a result, or throws an exception if unable to do so.
+ *
+ * @return computed result
+ * @throws Exception if unable to compute a result
+ */
+ @Override
+ public Void call() throws Exception {
+ try {
+ // has been canceled
+ if (flushId != currentFlushId) {
+ return null;
+ }
+ positionManagementService.putPosition(toFlush);
+ log.debug("Submitting {} entries to backing store. The offsets are: {}", data.size(), toFlush);
+ positionManagementService.persist();
+ positionManagementService.synchronize();
+ // persist finished
+ toFlush = null;
+ currentFlushId++;
+ } catch (Throwable throwable) {
+ // rollback
+ cancelFlush();
+ this.callback.onCompletion(throwable, null, null);
+ }
+ return null;
+ }
+ }
+
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
index 5f7ad28..a6902cc 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
@@ -288,18 +288,18 @@ public class ConnectUtil {
return recordPartition;
}
- public static DefaultMQPullConsumer initDefaultMQPullConsumer(ConnectConfig connectConfig, String connectorName, ConnectKeyValue keyValue, Integer taskId) {
+ public static DefaultMQPullConsumer initDefaultMQPullConsumer(ConnectConfig connectConfig, ConnectorTaskId id, ConnectKeyValue keyValue) {
RPCHook rpcHook = null;
if (connectConfig.getAclEnable()) {
rpcHook = new AclClientRPCHook(new SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey()));
}
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(rpcHook);
- consumer.setInstanceName(createInstance(connectorName.concat("-").concat(taskId.toString())));
+ consumer.setInstanceName(id.toString());
String taskGroupId = keyValue.getString("task-group-id");
if (StringUtils.isNotBlank(taskGroupId)) {
consumer.setConsumerGroup(taskGroupId);
} else {
- consumer.setConsumerGroup(SYS_TASK_CG_PREFIX + connectorName);
+ consumer.setConsumerGroup(SYS_TASK_CG_PREFIX + id.connector());
}
if (StringUtils.isNotBlank(connectConfig.getNamesrvAddr())) {
consumer.setNamesrvAddr(connectConfig.getNamesrvAddr());
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectorTaskId.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectorTaskId.java
new file mode 100644
index 0000000..25e6dc5
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectorTaskId.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.runtime.utils;
+
+import com.alibaba.fastjson.annotation.JSONCreator;
+import com.alibaba.fastjson.annotation.JSONField;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Unique ID for a single task. It includes a unique connector ID and a task ID that is unique within
+ * the connector.
+ */
+public class ConnectorTaskId implements Serializable, Comparable<ConnectorTaskId> {
+ private final String connector;
+ private final int task;
+
+ @JSONCreator
+ public ConnectorTaskId(@JSONField(name = "connector") String connector, @JSONField(name = "task") int task) {
+ this.connector = connector;
+ this.task = task;
+ }
+
+ @JSONField(name = "connector")
+ public String connector() {
+ return connector;
+ }
+
+ @JSONField
+ public int task() {
+ return task;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ ConnectorTaskId that = (ConnectorTaskId) o;
+
+ if (task != that.task)
+ return false;
+
+ return Objects.equals(connector, that.connector);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = connector != null ? connector.hashCode() : 0;
+ result = 31 * result + task;
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return connector + '-' + task;
+ }
+
+ @Override
+ public int compareTo(ConnectorTaskId o) {
+ int connectorCmp = connector.compareTo(o.connector);
+ if (connectorCmp != 0)
+ return connectorCmp;
+ return Integer.compare(task, o.task);
+ }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/CurrentTaskState.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/CurrentTaskState.java
new file mode 100644
index 0000000..401db98
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/CurrentTaskState.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.runtime.utils;
+
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerTaskState;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Objects;
+
+
+/**
+ * current task state
+ */
+public class CurrentTaskState implements Serializable {
+
+ private String connectorName;
+ private Map<String, String> config;
+ private WorkerTaskState taskState;
+
+
+ public CurrentTaskState(String connector, ConnectKeyValue connectKeyValue, WorkerTaskState taskState) {
+ this.connectorName = connector;
+ this.config = connectKeyValue.getProperties();
+ this.taskState = taskState;
+ }
+
+ public String getConnectorName() {
+ return connectorName;
+ }
+
+ public void setConnectorName(String connectorName) {
+ this.connectorName = connectorName;
+ }
+
+ public Map<String, String> getConfig() {
+ return config;
+ }
+
+ public void setConfig(Map<String, String> config) {
+ this.config = config;
+ }
+
+ public WorkerTaskState getTaskState() {
+ return taskState;
+ }
+
+ public void setTaskState(WorkerTaskState taskState) {
+ this.taskState = taskState;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof CurrentTaskState)) return false;
+ CurrentTaskState that = (CurrentTaskState) o;
+ return Objects.equals(connectorName, that.connectorName) && Objects.equals(config, that.config) && taskState == that.taskState;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(connectorName, config, taskState);
+ }
+}
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
index 7931eac..529d1ba 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
@@ -45,6 +45,7 @@ import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
+import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
import org.apache.rocketmq.connect.runtime.utils.Plugin;
import org.apache.rocketmq.connect.runtime.utils.TestUtils;
import org.junit.After;
@@ -118,8 +119,10 @@ public class WorkerTest {
RetryWithToleranceOperator retryWithToleranceOperator = ReporterManagerUtil.createRetryWithToleranceOperator(connectKeyValue);
retryWithToleranceOperator.reporters(ReporterManagerUtil.sourceTaskReporters("TEST-CONN-" + i, connectKeyValue));
- runnables.add(new WorkerSourceTask("TEST-CONN-" + i,
+ runnables.add(new WorkerSourceTask(new ConnectConfig(),
+ new ConnectorTaskId("TEST-CONN-" + i,i),
new TestSourceTask(),
+ null,
connectKeyValue,
new TestPositionManageServiceImpl(),
new JsonConverter(),
@@ -189,7 +192,7 @@ public class WorkerTest {
} else {
workerSinkTask = (WorkerSinkTask) runnable;
}
- String connectorName = null != workerSourceTask ? workerSourceTask.getConnectorName() : workerSinkTask.getConnectorName();
+ String connectorName = null != workerSourceTask ? workerSourceTask.id().connector() : workerSinkTask.id().connector();
assertThat(connectorName).isIn("TEST-CONN-0", "TEST-CONN-1", "TEST-CONN-2", "TEST-CONN-3");
}
}
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
index 098601b..f82fcb2 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
@@ -57,6 +57,7 @@ import org.apache.rocketmq.connect.runtime.service.DefaultConnectorContext;
import org.apache.rocketmq.connect.runtime.service.PositionManagementServiceImpl;
import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
+import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
import org.apache.rocketmq.connect.runtime.utils.Plugin;
import org.junit.Before;
import org.junit.Test;
@@ -212,13 +213,13 @@ public class RestHandlerTest {
RetryWithToleranceOperator retryWithToleranceOperator = ReporterManagerUtil.createRetryWithToleranceOperator(connectKeyValue);
retryWithToleranceOperator.reporters(ReporterManagerUtil.sourceTaskReporters("testConnectorName1", connectKeyValue));
- WorkerSourceTask workerSourceTask1 = new WorkerSourceTask("testConnectorName1", sourceTask, connectKeyValue, positionManagementServiceImpl, converter, producer, workerState, connectStatsManager, connectStatsService, transformChain,retryWithToleranceOperator);
+ WorkerSourceTask workerSourceTask1 = new WorkerSourceTask(new ConnectConfig(),new ConnectorTaskId("testConnectorName1", 1), sourceTask,null, connectKeyValue, positionManagementServiceImpl, converter, producer, workerState, connectStatsManager, connectStatsService, transformChain,retryWithToleranceOperator);
// create retry operator
RetryWithToleranceOperator retryWithToleranceOperator02 = ReporterManagerUtil.createRetryWithToleranceOperator(connectKeyValue);
retryWithToleranceOperator02.reporters(ReporterManagerUtil.sourceTaskReporters("testConnectorName2", connectKeyValue));
- WorkerSourceTask workerSourceTask2 = new WorkerSourceTask("testConnectorName2", sourceTask, connectKeyValue1, positionManagementServiceImpl, converter, producer, workerState, connectStatsManager, connectStatsService, transformChain,retryWithToleranceOperator02);
+ WorkerSourceTask workerSourceTask2 = new WorkerSourceTask(new ConnectConfig(), new ConnectorTaskId("testConnectorName2", 1), sourceTask,null, connectKeyValue1, positionManagementServiceImpl, converter, producer, workerState, connectStatsManager, connectStatsService, transformChain,retryWithToleranceOperator02);
workerTasks = new HashSet<Runnable>() {
{
add(workerSourceTask1);