You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2023/05/29 07:07:59 UTC

[seatunnel] branch dev updated: [Bugfix][connector-v2][rabbitmq] Fix reduplicate ack msg bug and code style (#4842)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 985fb6642 [Bugfix][connector-v2][rabbitmq] Fix reduplicate ack msg bug and code style (#4842)
985fb6642 is described below

commit 985fb6642aca90c59f82bc3d3b49731a15523816
Author: Bibo <33...@users.noreply.github.com>
AuthorDate: Mon May 29 15:07:53 2023 +0800

    [Bugfix][connector-v2][rabbitmq] Fix reduplicate ack msg bug and code style (#4842)
    
    ---------
    
    Co-authored-by: 毕博 <bi...@mafengwo.com>
---
 docs/en/connector-v2/sink/Rabbitmq.md                        |  9 +++++++--
 .../connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java | 12 ++++++++++++
 .../seatunnel/rabbitmq/source/RabbitmqSourceReader.java      | 10 ++++++++--
 3 files changed, 27 insertions(+), 4 deletions(-)

diff --git a/docs/en/connector-v2/sink/Rabbitmq.md b/docs/en/connector-v2/sink/Rabbitmq.md
index 4f787e724..a0037813b 100644
--- a/docs/en/connector-v2/sink/Rabbitmq.md
+++ b/docs/en/connector-v2/sink/Rabbitmq.md
@@ -24,6 +24,7 @@ Used to write data to Rabbitmq.
 | network_recovery_interval  | int     | no       | -             |
 | topology_recovery_enabled  | boolean | no       | -             |
 | automatic_recovery_enabled | boolean | no       | -             |
+| use_correlation_id         | boolean | no       | false         |
 | connection_timeout         | int     | no       | -             |
 | rabbitmq.config            | map     | no       | -             |
 | common-options             |         | no       | -             |
@@ -66,14 +67,18 @@ the schema fields of upstream data.
 
 how long will automatic recovery wait before attempting to reconnect, in ms
 
-### topology_recovery [string]
+### topology_recovery_enabled [boolean]
 
 if true, enables topology recovery
 
-### automatic_recovery [string]
+### automatic_recovery_enabled [boolean]
 
 if true, enables connection recovery
 
+### use_correlation_id [boolean]
+
+whether the messages received are supplied with a unique id to deduplicate messages (in case of failed acknowledgments).
+
 ### connection_timeout [int]
 
 connection TCP establishment timeout in milliseconds; zero for infinite
diff --git a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
index 728e52c85..e8e2ce55c 100644
--- a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
+++ b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
@@ -58,6 +58,7 @@ public class RabbitmqConfig implements Serializable {
     private String exchange = "";
 
     private boolean forE2ETesting = false;
+    private boolean usesCorrelationId = false;
 
     private final Map<String, Object> sinkOptionProps = new HashMap<>();
 
@@ -186,6 +187,14 @@ public class RabbitmqConfig implements Serializable {
                             "In addition to the above parameters that must be specified by the RabbitMQ client, the user can also specify multiple non-mandatory parameters for the client, "
                                     + "covering [all the parameters specified in the official RabbitMQ document](https://www.rabbitmq.com/configure.html).");
 
+    public static final Option<Boolean> USE_CORRELATION_ID =
+            Options.key("use_correlation_id")
+                    .booleanType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Whether the messages received are supplied with a unique"
+                                    + "id to deduplicate messages (in case of failed acknowledgments).");
+
     private void parseSinkOptionProperties(Config pluginConfig) {
         if (CheckConfigUtil.isValidParam(pluginConfig, RABBITMQ_CONFIG.key())) {
             pluginConfig
@@ -247,6 +256,9 @@ public class RabbitmqConfig implements Serializable {
         if (config.hasPath(FOR_E2E_TESTING.key())) {
             this.forE2ETesting = config.getBoolean(FOR_E2E_TESTING.key());
         }
+        if (config.hasPath(USE_CORRELATION_ID.key())) {
+            this.usesCorrelationId = config.getBoolean(USE_CORRELATION_ID.key());
+        }
         parseSinkOptionProperties(config);
     }
 
diff --git a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSourceReader.java b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSourceReader.java
index 29187a5cb..df80294f1 100644
--- a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSourceReader.java
+++ b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSourceReader.java
@@ -40,6 +40,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.SortedMap;
@@ -54,7 +55,7 @@ public class RabbitmqSourceReader<T> implements SourceReader<T, RabbitmqSplit> {
 
     protected final SourceReader.Context context;
     protected transient Channel channel;
-    private final boolean usesCorrelationId = false;
+    private final boolean usesCorrelationId;
     protected transient boolean autoAck;
 
     protected transient Set<String> correlationIdsProcessedButNotAcknowledged;
@@ -80,6 +81,7 @@ public class RabbitmqSourceReader<T> implements SourceReader<T, RabbitmqSplit> {
         this.config = config;
         this.rabbitMQClient = new RabbitmqClient(config);
         this.channel = rabbitMQClient.getChannel();
+        this.usesCorrelationId = config.isUsesCorrelationId();
     }
 
     @Override
@@ -113,6 +115,8 @@ public class RabbitmqSourceReader<T> implements SourceReader<T, RabbitmqSplit> {
         if (deliveryOptional.isPresent()) {
             Delivery delivery = deliveryOptional.get();
             AMQP.BasicProperties properties = delivery.getProperties();
+            String correlationId =
+                    Objects.isNull(properties) ? null : properties.getCorrelationId();
             byte[] body = delivery.getBody();
             Envelope envelope = delivery.getEnvelope();
             synchronized (output.getCheckpointLock()) {
@@ -135,7 +139,7 @@ public class RabbitmqSourceReader<T> implements SourceReader<T, RabbitmqSplit> {
     }
 
     @Override
-    public List snapshotState(long checkpointId) throws Exception {
+    public List<RabbitmqSplit> snapshotState(long checkpointId) throws Exception {
 
         List<RabbitmqSplit> pendingSplit =
                 Collections.singletonList(
@@ -159,6 +163,8 @@ public class RabbitmqSourceReader<T> implements SourceReader<T, RabbitmqSplit> {
                 correlationIds.addAll(currentCheckPointCorrelationIds);
             }
         }
+        // clear for next snapshot
+        deliveryTagsProcessedForCurrentSnapshot.clear();
         return pendingSplit;
     }