You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2019/05/17 21:38:09 UTC

[bahir-flink] branch master updated: [BAHIR-190] Fixed premature exit on empty queue

This is an automated email from the ASF dual-hosted git repository.

lresende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e5b1cae  [BAHIR-190] Fixed premature exit on empty queue
e5b1cae is described below

commit e5b1cae62f0fb47b15ff69d354b779b6072c27cf
Author: Krystex <58...@users.noreply.github.com>
AuthorDate: Thu May 9 12:11:28 2019 +0200

    [BAHIR-190] Fixed premature exit on empty queue
    
    When the source queue has no more messages, the job
    doesn't exit anymore. This was a problem with ActiveMQ.
    
    Closes #53
---
 .../java/org/apache/flink/streaming/connectors/activemq/AMQSource.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
index 0c43956..8b8c948 100644
--- a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
+++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
@@ -219,7 +219,7 @@ public class AMQSource<OUT> extends MessageAcknowledgingSourceBase<OUT, String>
             Message message = consumer.receive(1000);
             if (! (message instanceof BytesMessage)) {
                 LOG.warn("Active MQ source received non bytes message: {}", message);
-                return;
+                continue;
             }
             BytesMessage bytesMessage = (BytesMessage) message;
             byte[] bytes = new byte[(int) bytesMessage.getBodyLength()];