You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by su...@apache.org on 2022/09/13 02:46:10 UTC
[rocketmq-connect] branch master updated: [ISSUE #314] Worker sink task bug fix (#315)
This is an automated email from the ASF dual-hosted git repository.
sunxiaojian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 98748109 [ISSUE #314] Worker sink task bug fix (#315)
98748109 is described below
commit 987481090d0daacdb0021b1c32ebc1a4d83988e4
Author: zhoubo <87...@qq.com>
AuthorDate: Tue Sep 13 10:46:04 2022 +0800
[ISSUE #314] Worker sink task bug fix (#315)
* consume.from.where formate
checkAndStopConnectors bug fix
remove sinkTask.flush
removeAndCloseMessageQueue topic bug fix
removeMessageQueues filter the same topic
sinkTaskContext.getPausedQueues().retainAll messageQueues
* remove duplicate connectors.remove & optimize consumer setMessageQueueListener
---
.../connect/runtime/config/ConnectorConfig.java | 3 ++
.../connect/runtime/connectorwrapper/Worker.java | 10 +++-
.../runtime/connectorwrapper/WorkerSinkTask.java | 59 ++++++++++++----------
.../runtime/connectorwrapper/WorkerTask.java | 7 ++-
4 files changed, 45 insertions(+), 34 deletions(-)
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectorConfig.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectorConfig.java
index cf0f17af..58310fbf 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectorConfig.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectorConfig.java
@@ -95,6 +95,9 @@ public class ConnectorConfig {
public static final String ERRORS_TOLERANCE_CONFIG = "errors.tolerance";
public static final ToleranceType ERRORS_TOLERANCE_DEFAULT = ToleranceType.NONE;
+ public static final String CONSUME_FROM_WHERE = "consume.from.where";
+
+
/**
* The required key for all configurations.
*/
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
index bcd889b4..4ba7baff 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
@@ -244,12 +244,18 @@ public class Worker {
* @param assigns
*/
private void checkAndStopConnectors(Collection<String> assigns) {
- if (assigns.isEmpty()) {
+ if (CollectionUtils.isEmpty(assigns)) {
// delete all
- assigns = connectors.keySet();
+ Set<String> connectors = this.connectors.keySet();
+ for (String connector : connectors) {
+ log.info("It may be that the load balancing assigns this connector to other nodes,connector {}", connector);
+ stopAndAwaitConnector(connector);
+ }
+ return;
}
for (String connectorName : assigns) {
if (!assigns.contains(connectorName)) {
+ log.info("It may be that the load balancing assigns this connector to other nodes,connector {}", connectorName);
stopAndAwaitConnector(connectorName);
}
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
index 14dc9cbc..c4fbc095 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
@@ -304,7 +304,6 @@ public class WorkerSinkTask extends WorkerTask {
} finally {
if (closing) {
log.trace("{} Closing the task before committing the offsets: {}", this, offsetsToCommit);
- sinkTask.flush(taskProvidedRecordOffsets);
}
}
if (taskProvidedOffsets.isEmpty()) {
@@ -574,34 +573,37 @@ public class WorkerSinkTask extends WorkerTask {
for (String topic : topics) {
consumer.setPullBatchSize(MAX_MESSAGE_NUM);
consumer.subscribe(topic, "*");
- if (messageQueueListener == null) {
- messageQueueListener = consumer.getMessageQueueListener();
- }
- consumer.setMessageQueueListener(new MessageQueueListener() {
- @Override
- public void messageQueueChanged(String subTopic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
- // update assign message queue
- messageQueueListener.messageQueueChanged(subTopic, mqAll, mqDivided);
- // listener message queue changed
- log.info("Message queue changed start, old message queues offset {}", JSON.toJSONString(messageQueues));
-
- if (isStopping()) {
- log.trace("Skipping partition revocation callback as task has already been stopped");
- return;
- }
- // remove and close message queue
- removeAndCloseMessageQueue(topic, mqDivided);
+ }
+ if (messageQueueListener == null) {
+ messageQueueListener = consumer.getMessageQueueListener();
+ }
+ consumer.setMessageQueueListener(new MessageQueueListener() {
+ @Override
+ public void messageQueueChanged(String subTopic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
+ // update assign message queue
+ messageQueueListener.messageQueueChanged(subTopic, mqAll, mqDivided);
+ // listener message queue changed
+ log.info("Message queue changed start, old message queues offset {}", JSON.toJSONString(messageQueues));
+
+ if (isStopping()) {
+ log.trace("Skipping partition revocation callback as task has already been stopped");
+ return;
+ }
+ // remove and close message queue
+ log.info("Task {},MessageQueueChanged, old messageQueuesOffsetMap {}", id.toString(), JSON.toJSONString(messageQueues));
+ removeAndCloseMessageQueue(subTopic, mqDivided);
- // add new message queue
- assignMessageQueue(mqDivided);
- preCommit();
- log.info("Message queue changed start, new message queues offset {}", JSON.toJSONString(messageQueues));
+ // add new message queue
+ assignMessageQueue(mqDivided);
+ log.info("Task {}, Message queue changed end, new message queues offset {}", id, JSON.toJSONString(messageQueues));
+ preCommit();
+ log.info("Message queue changed start, new message queues offset {}", JSON.toJSONString(messageQueues));
- }
- });
- }
+ }
+ });
consumer.start();
} catch (MQClientException e) {
+ log.error("Task {},InitializeAndStart MQClientException", id.toString(), e);
throw new ConnectException(e);
}
log.info("Sink task consumer start. taskConfig {}", JSON.toJSONString(taskConfig));
@@ -626,7 +628,7 @@ public class WorkerSinkTask extends WorkerTask {
}
}
// filter not contains in messageQueues
- removeMessageQueues = messageQueues.stream().filter(messageQueue -> !queues.contains(messageQueue)).collect(Collectors.toSet());
+ removeMessageQueues = messageQueues.stream().filter(messageQueue -> topic.equals(messageQueue.getTopic()) && !queues.contains(messageQueue)).collect(Collectors.toSet());
if (removeMessageQueues == null || removeMessageQueues.isEmpty()) {
return;
}
@@ -696,8 +698,9 @@ public class WorkerSinkTask extends WorkerTask {
resumeAll();
}
// reset
- sinkTaskContext.getPausedQueues().retainAll(queues);
+ sinkTaskContext.getPausedQueues().retainAll(messageQueues);
if (shouldPause()) {
+ pauseAll();
return;
}
if (!sinkTaskContext.getPausedQueues().isEmpty()) {
@@ -725,7 +728,7 @@ public class WorkerSinkTask extends WorkerTask {
}
if (offset < 0) {
- String consumeFromWhere = taskConfig.getString("consume-from-where");
+ String consumeFromWhere = taskConfig.getString(ConnectorConfig.CONSUME_FROM_WHERE);
if (StringUtils.isBlank(consumeFromWhere)) {
consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET.name();
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTask.java
index 9dace233..90e7fba5 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTask.java
@@ -17,6 +17,9 @@
package org.apache.rocketmq.connect.runtime.connectorwrapper;
import io.openmessaging.connector.api.data.ConnectRecord;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
import org.apache.rocketmq.connect.runtime.connectorwrapper.status.TaskStatus;
@@ -27,10 +30,6 @@ import org.apache.rocketmq.connect.runtime.utils.CurrentTaskState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
/**
* Should we use callable here ?
*/