You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2017/12/22 14:38:35 UTC

[GitHub] merlimat closed pull request #1000: Make sure nextTuple emits tuple with non-null values

merlimat closed pull request #1000: Make sure nextTuple emits tuple with non-null values
URL: https://github.com/apache/incubator-pulsar/pull/1000
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
index 1e6469d4a..639adb915 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
@@ -158,6 +158,16 @@ public void fail(Object msgId) {
      */
     @Override
     public void nextTuple() {
+        emitNextAvailableTuple();
+    }
+    
+    /**
+     * It makes sure that it emits next available non-tuple to topology unless consumer queue doesn't have any message
+     * available. It receives message from consumer queue and converts it to tuple and emits to topology. if the
+     * converted tuple is null then it tries to receives next message and perform the same until it finds non-tuple to
+     * emit.
+     */
+    public void emitNextAvailableTuple() {
         Message msg;
 
         // check if there are any failed messages to re-emit in the topology
@@ -182,12 +192,18 @@ public void nextTuple() {
                 LOG.debug("[{}] Receiving the next message from pulsar consumer to emit to the collector", spoutId);
             }
             try {
-                msg = consumer.receive(1, TimeUnit.SECONDS);
-                if (msg != null) {
-                    ++messagesReceived;
-                    messageSizeReceived += msg.getData().length;
+                boolean done = false;
+                while (!done) {
+                    msg = consumer.receive(100, TimeUnit.MILLISECONDS);
+                    if (msg != null) {
+                        ++messagesReceived;
+                        messageSizeReceived += msg.getData().length;
+                        done = mapToValueAndEmit(msg);
+                    } else {
+                        // queue is empty and nothing to emit
+                        done = true;
+                    }
                 }
-                mapToValueAndEmit(msg);
             } catch (PulsarClientException e) {
                 LOG.error("[{}] Error receiving message from pulsar consumer", spoutId, e);
             }
@@ -228,7 +244,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
 
     }
 
-    private void mapToValueAndEmit(Message msg) {
+    private boolean mapToValueAndEmit(Message msg) {
         if (msg != null) {
             Values values = pulsarSpoutConf.getMessageToValuesMapper().toValues(msg);
             ++pendingAcks;
@@ -244,8 +260,10 @@ private void mapToValueAndEmit(Message msg) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("[{}] Emitted message {} to the collector", spoutId, msg.getMessageId());
                 }
+                return true;
             }
         }
+        return false;
     }
 
     public class MessageRetries {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services