You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2019/05/18 07:49:57 UTC
[bahir] branch master updated: [MINOR] Handle case when no messages
from pubsub
This is an automated email from the ASF dual-hosted git repository.
lresende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir.git
The following commit(s) were added to refs/heads/master by this push:
new b681d69 [MINOR] Handle case when no messages from pubsub
b681d69 is described below
commit b681d691c28d2623bc8de047a15ed805e15987c8
Author: Grzegorz Lyczba <gr...@gmail.com>
AuthorDate: Fri May 17 00:53:05 2019 +0200
[MINOR] Handle case when no messages from pubsub
Method getReceivedMessages returns NULL when there is no message in
a subscription. Store for processing in Spark and prepare the ACK request
only when, at least, one message is ready for processing.
---
.../spark/streaming/pubsub/PubsubInputDStream.scala | 16 +++++++++-------
1 file changed, 9 insertions(+), 7 deletions(-)
diff --git a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala
index 1e4d8fa..7357d23 100644
--- a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala
+++ b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala
@@ -22,7 +22,6 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput}
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
-import com.google.api.client.auth.oauth2.Credential
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport
import com.google.api.client.googleapis.json.GoogleJsonResponseException
import com.google.api.client.json.jackson2.JacksonFactory
@@ -277,8 +276,9 @@ class PubsubReceiver(
try {
val pullResponse =
client.projects().subscriptions().pull(subscriptionFullName, pullRequest).execute()
- val receivedMessages = pullResponse.getReceivedMessages.asScala.toList
- store(receivedMessages
+ val receivedMessages = pullResponse.getReceivedMessages
+ if (receivedMessages != null) {
+ store(receivedMessages.asScala.toList
.map(x => {
val sm = new SparkPubsubMessage
sm.message = x.getMessage
@@ -287,10 +287,12 @@ class PubsubReceiver(
})
.iterator)
- if (autoAcknowledge) {
- val ackRequest = new AcknowledgeRequest()
- ackRequest.setAckIds(receivedMessages.map(x => x.getAckId).asJava)
- client.projects().subscriptions().acknowledge(subscriptionFullName, ackRequest).execute()
+ if (autoAcknowledge) {
+ val ackRequest = new AcknowledgeRequest()
+ ackRequest.setAckIds(receivedMessages.asScala.map(x => x.getAckId).asJava)
+ client.projects().subscriptions().acknowledge(subscriptionFullName,
+ ackRequest).execute()
+ }
}
backoff = INIT_BACKOFF
} catch {