You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2019/05/18 07:49:57 UTC

[bahir] branch master updated: [MINOR] Handle case when no messages from pubsub

This is an automated email from the ASF dual-hosted git repository.

lresende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir.git


The following commit(s) were added to refs/heads/master by this push:
     new b681d69  [MINOR] Handle case when no messages from pubsub
b681d69 is described below

commit b681d691c28d2623bc8de047a15ed805e15987c8
Author: Grzegorz Lyczba <gr...@gmail.com>
AuthorDate: Fri May 17 00:53:05 2019 +0200

    [MINOR] Handle case when no messages from pubsub
    
    Method getReceivedMessages returns NULL when there is no message in
    a subscription. Store for processing in Spark and prepare the ACK request
    only when, at least, one message is ready for processing.
---
 .../spark/streaming/pubsub/PubsubInputDStream.scala      | 16 +++++++++-------
 1 file changed, 9 insertions(+), 7 deletions(-)

diff --git a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala
index 1e4d8fa..7357d23 100644
--- a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala
+++ b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala
@@ -22,7 +22,6 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput}
 import scala.collection.JavaConverters._
 import scala.util.control.NonFatal
 
-import com.google.api.client.auth.oauth2.Credential
 import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport
 import com.google.api.client.googleapis.json.GoogleJsonResponseException
 import com.google.api.client.json.jackson2.JacksonFactory
@@ -277,8 +276,9 @@ class PubsubReceiver(
       try {
         val pullResponse =
           client.projects().subscriptions().pull(subscriptionFullName, pullRequest).execute()
-        val receivedMessages = pullResponse.getReceivedMessages.asScala.toList
-        store(receivedMessages
+        val receivedMessages = pullResponse.getReceivedMessages
+        if (receivedMessages != null) {
+          store(receivedMessages.asScala.toList
             .map(x => {
               val sm = new SparkPubsubMessage
               sm.message = x.getMessage
@@ -287,10 +287,12 @@ class PubsubReceiver(
             })
             .iterator)
 
-        if (autoAcknowledge) {
-          val ackRequest = new AcknowledgeRequest()
-          ackRequest.setAckIds(receivedMessages.map(x => x.getAckId).asJava)
-          client.projects().subscriptions().acknowledge(subscriptionFullName, ackRequest).execute()
+          if (autoAcknowledge) {
+            val ackRequest = new AcknowledgeRequest()
+            ackRequest.setAckIds(receivedMessages.asScala.map(x => x.getAckId).asJava)
+            client.projects().subscriptions().acknowledge(subscriptionFullName,
+              ackRequest).execute()
+          }
         }
         backoff = INIT_BACKOFF
       } catch {