You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by su...@apache.org on 2022/09/13 02:46:10 UTC

[rocketmq-connect] branch master updated: [ISSUE #314] Worker sink task bug fix (#315)

This is an automated email from the ASF dual-hosted git repository.

sunxiaojian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 98748109 [ISSUE #314] Worker sink task bug fix (#315)
98748109 is described below

commit 987481090d0daacdb0021b1c32ebc1a4d83988e4
Author: zhoubo <87...@qq.com>
AuthorDate: Tue Sep 13 10:46:04 2022 +0800

    [ISSUE #314] Worker sink task bug fix (#315)
    
    * consume.from.where formate
    checkAndStopConnectors bug fix
    remove sinkTask.flush
    removeAndCloseMessageQueue topic bug fix
    removeMessageQueues filter the same topic
    sinkTaskContext.getPausedQueues().retainAll messageQueues
    
    * remove duplicate  connectors.remove & optimize consumer setMessageQueueListener
---
 .../connect/runtime/config/ConnectorConfig.java    |  3 ++
 .../connect/runtime/connectorwrapper/Worker.java   | 10 +++-
 .../runtime/connectorwrapper/WorkerSinkTask.java   | 59 ++++++++++++----------
 .../runtime/connectorwrapper/WorkerTask.java       |  7 ++-
 4 files changed, 45 insertions(+), 34 deletions(-)

diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectorConfig.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectorConfig.java
index cf0f17af..58310fbf 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectorConfig.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectorConfig.java
@@ -95,6 +95,9 @@ public class ConnectorConfig {
     public static final String ERRORS_TOLERANCE_CONFIG = "errors.tolerance";
     public static final ToleranceType ERRORS_TOLERANCE_DEFAULT = ToleranceType.NONE;
 
+    public static final String CONSUME_FROM_WHERE = "consume.from.where";
+
+
     /**
      * The required key for all configurations.
      */
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
index bcd889b4..4ba7baff 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
@@ -244,12 +244,18 @@ public class Worker {
      * @param assigns
      */
     private void checkAndStopConnectors(Collection<String> assigns) {
-        if (assigns.isEmpty()) {
+        if (CollectionUtils.isEmpty(assigns)) {
             // delete all
-            assigns = connectors.keySet();
+            Set<String> connectors = this.connectors.keySet();
+            for (String connector : connectors) {
+                log.info("It may be that the load balancing assigns this connector to other nodes,connector {}", connector);
+                stopAndAwaitConnector(connector);
+            }
+            return;
         }
         for (String connectorName : assigns) {
             if (!assigns.contains(connectorName)) {
+                log.info("It may be that the load balancing assigns this connector to other nodes,connector {}", connectorName);
                 stopAndAwaitConnector(connectorName);
             }
         }
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
index 14dc9cbc..c4fbc095 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
@@ -304,7 +304,6 @@ public class WorkerSinkTask extends WorkerTask {
         } finally {
             if (closing) {
                 log.trace("{} Closing the task before committing the offsets: {}", this, offsetsToCommit);
-                sinkTask.flush(taskProvidedRecordOffsets);
             }
         }
         if (taskProvidedOffsets.isEmpty()) {
@@ -574,34 +573,37 @@ public class WorkerSinkTask extends WorkerTask {
             for (String topic : topics) {
                 consumer.setPullBatchSize(MAX_MESSAGE_NUM);
                 consumer.subscribe(topic, "*");
-                if (messageQueueListener == null) {
-                    messageQueueListener = consumer.getMessageQueueListener();
-                }
-                consumer.setMessageQueueListener(new MessageQueueListener() {
-                    @Override
-                    public void messageQueueChanged(String subTopic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
-                        // update assign message queue
-                        messageQueueListener.messageQueueChanged(subTopic, mqAll, mqDivided);
-                        // listener message queue changed
-                        log.info("Message queue changed start, old message queues offset {}", JSON.toJSONString(messageQueues));
-
-                        if (isStopping()) {
-                            log.trace("Skipping partition revocation callback as task has already been stopped");
-                            return;
-                        }
-                        // remove and close message queue
-                        removeAndCloseMessageQueue(topic, mqDivided);
+            }
+            if (messageQueueListener == null) {
+                messageQueueListener = consumer.getMessageQueueListener();
+            }
+            consumer.setMessageQueueListener(new MessageQueueListener() {
+                @Override
+                public void messageQueueChanged(String subTopic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
+                    // update assign message queue
+                    messageQueueListener.messageQueueChanged(subTopic, mqAll, mqDivided);
+                    // listener message queue changed
+                    log.info("Message queue changed start, old message queues offset {}", JSON.toJSONString(messageQueues));
+
+                    if (isStopping()) {
+                        log.trace("Skipping partition revocation callback as task has already been stopped");
+                        return;
+                    }
+                    // remove and close message queue
+                    log.info("Task {},MessageQueueChanged, old messageQueuesOffsetMap {}", id.toString(), JSON.toJSONString(messageQueues));
+                    removeAndCloseMessageQueue(subTopic, mqDivided);
 
-                        // add new message queue
-                        assignMessageQueue(mqDivided);
-                        preCommit();
-                        log.info("Message queue changed start, new message queues offset {}", JSON.toJSONString(messageQueues));
+                    // add new message queue
+                    assignMessageQueue(mqDivided);
+                    log.info("Task {}, Message queue changed end, new message queues offset {}", id, JSON.toJSONString(messageQueues));
+                    preCommit();
+                    log.info("Message queue changed start, new message queues offset {}", JSON.toJSONString(messageQueues));
 
-                    }
-                });
-            }
+                }
+            });
             consumer.start();
         } catch (MQClientException e) {
+            log.error("Task {},InitializeAndStart MQClientException", id.toString(), e);
             throw new ConnectException(e);
         }
         log.info("Sink task consumer start. taskConfig {}", JSON.toJSONString(taskConfig));
@@ -626,7 +628,7 @@ public class WorkerSinkTask extends WorkerTask {
             }
         }
         // filter not contains in messageQueues
-        removeMessageQueues = messageQueues.stream().filter(messageQueue -> !queues.contains(messageQueue)).collect(Collectors.toSet());
+        removeMessageQueues = messageQueues.stream().filter(messageQueue -> topic.equals(messageQueue.getTopic()) && !queues.contains(messageQueue)).collect(Collectors.toSet());
         if (removeMessageQueues == null || removeMessageQueues.isEmpty()) {
             return;
         }
@@ -696,8 +698,9 @@ public class WorkerSinkTask extends WorkerTask {
                 resumeAll();
             }
             // reset
-            sinkTaskContext.getPausedQueues().retainAll(queues);
+            sinkTaskContext.getPausedQueues().retainAll(messageQueues);
             if (shouldPause()) {
+                pauseAll();
                 return;
             }
             if (!sinkTaskContext.getPausedQueues().isEmpty()) {
@@ -725,7 +728,7 @@ public class WorkerSinkTask extends WorkerTask {
         }
 
         if (offset < 0) {
-            String consumeFromWhere = taskConfig.getString("consume-from-where");
+            String consumeFromWhere = taskConfig.getString(ConnectorConfig.CONSUME_FROM_WHERE);
             if (StringUtils.isBlank(consumeFromWhere)) {
                 consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET.name();
             }
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTask.java
index 9dace233..90e7fba5 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTask.java
@@ -17,6 +17,9 @@
 package org.apache.rocketmq.connect.runtime.connectorwrapper;
 
 import io.openmessaging.connector.api.data.ConnectRecord;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
 import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
 import org.apache.rocketmq.connect.runtime.connectorwrapper.status.TaskStatus;
@@ -27,10 +30,6 @@ import org.apache.rocketmq.connect.runtime.utils.CurrentTaskState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
 /**
  * Should we use callable here ?
  */