You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by rm...@apache.org on 2019/04/14 18:32:25 UTC
[bahir-flink] branch master updated: [BAHIR-204] [activemq]
ActiveMQ Source only emits previously unprocessed records now
This is an automated email from the ASF dual-hosted git repository.
rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3f1b3ba [BAHIR-204] [activemq] ActiveMQ Source only emits previously unprocessed records now
3f1b3ba is described below
commit 3f1b3ba91d48d33214dd060fc3c1fda2f708484b
Author: Konstantin Knauf <kn...@gmail.com>
AuthorDate: Thu Apr 11 17:11:56 2019 +0200
[BAHIR-204] [activemq] ActiveMQ Source only emits previously unprocessed records now
---
.../org/apache/flink/streaming/connectors/activemq/AMQSource.java | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
index 4f2114f..0c43956 100644
--- a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
+++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
@@ -226,10 +226,11 @@ public class AMQSource<OUT> extends MessageAcknowledgingSourceBase<OUT, String>
bytesMessage.readBytes(bytes);
OUT value = deserializationSchema.deserialize(bytes);
synchronized (ctx.getCheckpointLock()) {
- ctx.collect(value);
- if (!autoAck) {
- addId(bytesMessage.getJMSMessageID());
+ if (!autoAck && addId(bytesMessage.getJMSMessageID())) {
+ ctx.collect(value);
unacknowledgedMessages.put(bytesMessage.getJMSMessageID(), bytesMessage);
+ } else {
+ ctx.collect(value);
}
}
}