You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2023/05/29 07:07:59 UTC
[seatunnel] branch dev updated: [Bugfix][connector-v2][rabbitmq] Fix reduplicate ack msg bug and code style (#4842)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 985fb6642 [Bugfix][connector-v2][rabbitmq] Fix reduplicate ack msg bug and code style (#4842)
985fb6642 is described below
commit 985fb6642aca90c59f82bc3d3b49731a15523816
Author: Bibo <33...@users.noreply.github.com>
AuthorDate: Mon May 29 15:07:53 2023 +0800
[Bugfix][connector-v2][rabbitmq] Fix reduplicate ack msg bug and code style (#4842)
---------
Co-authored-by: 毕博 <bi...@mafengwo.com>
---
docs/en/connector-v2/sink/Rabbitmq.md | 9 +++++++--
.../connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java | 12 ++++++++++++
.../seatunnel/rabbitmq/source/RabbitmqSourceReader.java | 10 ++++++++--
3 files changed, 27 insertions(+), 4 deletions(-)
diff --git a/docs/en/connector-v2/sink/Rabbitmq.md b/docs/en/connector-v2/sink/Rabbitmq.md
index 4f787e724..a0037813b 100644
--- a/docs/en/connector-v2/sink/Rabbitmq.md
+++ b/docs/en/connector-v2/sink/Rabbitmq.md
@@ -24,6 +24,7 @@ Used to write data to Rabbitmq.
| network_recovery_interval | int | no | - |
| topology_recovery_enabled | boolean | no | - |
| automatic_recovery_enabled | boolean | no | - |
+| use_correlation_id | boolean | no | false |
| connection_timeout | int | no | - |
| rabbitmq.config | map | no | - |
| common-options | | no | - |
@@ -66,14 +67,18 @@ the schema fields of upstream data.
how long will automatic recovery wait before attempting to reconnect, in ms
-### topology_recovery [string]
+### topology_recovery_enabled [boolean]
if true, enables topology recovery
-### automatic_recovery [string]
+### automatic_recovery_enabled [boolean]
if true, enables connection recovery
+### use_correlation_id [boolean]
+
+whether the messages received are supplied with a unique id to deduplicate messages (in case of failed acknowledgments).
+
### connection_timeout [int]
connection TCP establishment timeout in milliseconds; zero for infinite
diff --git a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
index 728e52c85..e8e2ce55c 100644
--- a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
+++ b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
@@ -58,6 +58,7 @@ public class RabbitmqConfig implements Serializable {
private String exchange = "";
private boolean forE2ETesting = false;
+ private boolean usesCorrelationId = false;
private final Map<String, Object> sinkOptionProps = new HashMap<>();
@@ -186,6 +187,14 @@ public class RabbitmqConfig implements Serializable {
"In addition to the above parameters that must be specified by the RabbitMQ client, the user can also specify multiple non-mandatory parameters for the client, "
+ "covering [all the parameters specified in the official RabbitMQ document](https://www.rabbitmq.com/configure.html).");
+ public static final Option<Boolean> USE_CORRELATION_ID =
+ Options.key("use_correlation_id")
+ .booleanType()
+ .noDefaultValue()
+ .withDescription(
+ "Whether the messages received are supplied with a unique"
+ + "id to deduplicate messages (in case of failed acknowledgments).");
+
private void parseSinkOptionProperties(Config pluginConfig) {
if (CheckConfigUtil.isValidParam(pluginConfig, RABBITMQ_CONFIG.key())) {
pluginConfig
@@ -247,6 +256,9 @@ public class RabbitmqConfig implements Serializable {
if (config.hasPath(FOR_E2E_TESTING.key())) {
this.forE2ETesting = config.getBoolean(FOR_E2E_TESTING.key());
}
+ if (config.hasPath(USE_CORRELATION_ID.key())) {
+ this.usesCorrelationId = config.getBoolean(USE_CORRELATION_ID.key());
+ }
parseSinkOptionProperties(config);
}
diff --git a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSourceReader.java b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSourceReader.java
index 29187a5cb..df80294f1 100644
--- a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSourceReader.java
+++ b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSourceReader.java
@@ -40,6 +40,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
@@ -54,7 +55,7 @@ public class RabbitmqSourceReader<T> implements SourceReader<T, RabbitmqSplit> {
protected final SourceReader.Context context;
protected transient Channel channel;
- private final boolean usesCorrelationId = false;
+ private final boolean usesCorrelationId;
protected transient boolean autoAck;
protected transient Set<String> correlationIdsProcessedButNotAcknowledged;
@@ -80,6 +81,7 @@ public class RabbitmqSourceReader<T> implements SourceReader<T, RabbitmqSplit> {
this.config = config;
this.rabbitMQClient = new RabbitmqClient(config);
this.channel = rabbitMQClient.getChannel();
+ this.usesCorrelationId = config.isUsesCorrelationId();
}
@Override
@@ -113,6 +115,8 @@ public class RabbitmqSourceReader<T> implements SourceReader<T, RabbitmqSplit> {
if (deliveryOptional.isPresent()) {
Delivery delivery = deliveryOptional.get();
AMQP.BasicProperties properties = delivery.getProperties();
+ String correlationId =
+ Objects.isNull(properties) ? null : properties.getCorrelationId();
byte[] body = delivery.getBody();
Envelope envelope = delivery.getEnvelope();
synchronized (output.getCheckpointLock()) {
@@ -135,7 +139,7 @@ public class RabbitmqSourceReader<T> implements SourceReader<T, RabbitmqSplit> {
}
@Override
- public List snapshotState(long checkpointId) throws Exception {
+ public List<RabbitmqSplit> snapshotState(long checkpointId) throws Exception {
List<RabbitmqSplit> pendingSplit =
Collections.singletonList(
@@ -159,6 +163,8 @@ public class RabbitmqSourceReader<T> implements SourceReader<T, RabbitmqSplit> {
correlationIds.addAll(currentCheckPointCorrelationIds);
}
}
+ // clear for next snapshot
+ deliveryTagsProcessedForCurrentSnapshot.clear();
return pendingSplit;
}