You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by rm...@apache.org on 2019/04/14 18:32:25 UTC

[bahir-flink] branch master updated: [BAHIR-204] [activemq] ActiveMQ Source only emits previously unprocessed records now

This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f1b3ba  [BAHIR-204] [activemq] ActiveMQ Source only emits previously unprocessed records now
3f1b3ba is described below

commit 3f1b3ba91d48d33214dd060fc3c1fda2f708484b
Author: Konstantin Knauf <kn...@gmail.com>
AuthorDate: Thu Apr 11 17:11:56 2019 +0200

    [BAHIR-204] [activemq] ActiveMQ Source only emits previously unprocessed records now
---
 .../org/apache/flink/streaming/connectors/activemq/AMQSource.java  | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
index 4f2114f..0c43956 100644
--- a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
+++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
@@ -226,10 +226,11 @@ public class AMQSource<OUT> extends MessageAcknowledgingSourceBase<OUT, String>
             bytesMessage.readBytes(bytes);
             OUT value = deserializationSchema.deserialize(bytes);
             synchronized (ctx.getCheckpointLock()) {
-                ctx.collect(value);
-                if (!autoAck) {
-                    addId(bytesMessage.getJMSMessageID());
+                if (!autoAck && addId(bytesMessage.getJMSMessageID())) {
+                    ctx.collect(value);
                     unacknowledgedMessages.put(bytesMessage.getJMSMessageID(), bytesMessage);
+                } else {
+                    ctx.collect(value);
                 }
             }
         }