You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/02/02 23:00:36 UTC

spark git commit: [SPARK-4631][streaming][FIX] Wait for a receiver to start before publishing test data.

Repository: spark
Updated Branches:
  refs/heads/master 683e93824 -> e908322cd


[SPARK-4631][streaming][FIX] Wait for a receiver to start before publishing test data.

This fixes two sources of non-deterministic failures in this test:

- wait for a receiver to be up before pushing data through MQTT
- gracefully handle the case where the MQTT client is overloaded. There’s
a hard-coded limit of 10 in-flight messages, and this test may hit it.
Instead of crashing, we retry sending the message.

Both of these are needed to make the test pass reliably on my machine.

Author: Iulian Dragos <ja...@gmail.com>

Closes #4270 from dragos/issue/fix-flaky-test-SPARK-4631 and squashes the following commits:

f66c482 [Iulian Dragos] [SPARK-4631][streaming] Wait for a receiver to start before publishing test data.
d408a8e [Iulian Dragos] Install callback before connecting to MQTT broker.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e908322c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e908322c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e908322c

Branch: refs/heads/master
Commit: e908322cd5991e6cbdaaafb8cd494759dac01225
Parents: 683e938
Author: Iulian Dragos <ja...@gmail.com>
Authored: Mon Feb 2 14:00:33 2015 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Mon Feb 2 14:00:33 2015 -0800

----------------------------------------------------------------------
 .../spark/streaming/mqtt/MQTTInputDStream.scala | 26 ++++++++-------
 .../spark/streaming/mqtt/MQTTStreamSuite.scala  | 35 ++++++++++++++++++--
 2 files changed, 46 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e908322c/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
index 77661f7..1ef91dd 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
@@ -55,14 +55,14 @@ class MQTTInputDStream(
     brokerUrl: String,
     topic: String,
     storageLevel: StorageLevel
-  ) extends ReceiverInputDStream[String](ssc_) with Logging {
-  
+  ) extends ReceiverInputDStream[String](ssc_) {
+
   def getReceiver(): Receiver[String] = {
     new MQTTReceiver(brokerUrl, topic, storageLevel)
   }
 }
 
-private[streaming] 
+private[streaming]
 class MQTTReceiver(
     brokerUrl: String,
     topic: String,
@@ -72,21 +72,15 @@ class MQTTReceiver(
   def onStop() {
 
   }
-  
+
   def onStart() {
 
-    // Set up persistence for messages 
+    // Set up persistence for messages
     val persistence = new MemoryPersistence()
 
     // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
     val client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence)
 
-    // Connect to MqttBroker
-    client.connect()
-
-    // Subscribe to Mqtt topic
-    client.subscribe(topic)
-
     // Callback automatically triggers as and when new message arrives on specified topic
     val callback: MqttCallback = new MqttCallback() {
 
@@ -103,7 +97,15 @@ class MQTTReceiver(
       }
     }
 
-    // Set up callback for MqttClient
+    // Set up callback for MqttClient. This needs to happen before
+    // connecting or subscribing, otherwise messages may be lost
     client.setCallback(callback)
+
+    // Connect to MqttBroker
+    client.connect()
+
+    // Subscribe to Mqtt topic
+    client.subscribe(topic)
+
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e908322c/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
index fe53a29..e84adc0 100644
--- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -18,6 +18,8 @@
 package org.apache.spark.streaming.mqtt
 
 import java.net.{URI, ServerSocket}
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.TimeUnit
 
 import scala.concurrent.duration._
 import scala.language.postfixOps
@@ -32,6 +34,8 @@ import org.scalatest.concurrent.Eventually
 import org.apache.spark.streaming.{Milliseconds, StreamingContext}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.scheduler.StreamingListener
+import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted
 import org.apache.spark.SparkConf
 import org.apache.spark.util.Utils
 
@@ -67,7 +71,7 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
     val sendMessage = "MQTT demo for spark streaming"
     val receiveStream: ReceiverInputDStream[String] =
       MQTTUtils.createStream(ssc, "tcp:" + brokerUri, topic, StorageLevel.MEMORY_ONLY)
-    var receiveMessage: List[String] = List()
+    @volatile var receiveMessage: List[String] = List()
     receiveStream.foreachRDD { rdd =>
       if (rdd.collect.length > 0) {
         receiveMessage = receiveMessage ::: List(rdd.first)
@@ -75,6 +79,11 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
       }
     }
     ssc.start()
+
+    // wait for the receiver to start before publishing data, or we risk failing
+    // the test nondeterministically. See SPARK-4631
+    waitForReceiverToStart()
+
     publishData(sendMessage)
     eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
       assert(sendMessage.equals(receiveMessage(0)))
@@ -121,8 +130,14 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
         val message: MqttMessage = new MqttMessage(data.getBytes("utf-8"))
         message.setQos(1)
         message.setRetained(true)
-        for (i <- 0 to 100) {
-          msgTopic.publish(message)
+
+        for (i <- 0 to 10) {
+          try {
+            msgTopic.publish(message)
+          } catch {
+            case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT =>
+              Thread.sleep(50) // wait for Spark streaming to consume something from the message queue
+          }
         }
       }
     } finally {
@@ -131,4 +146,18 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
       client = null
     }
   }
+
+  /**
+   * Block until at least one receiver has started or timeout occurs.
+   */
+  private def waitForReceiverToStart() = {
+    val latch = new CountDownLatch(1)
+    ssc.addStreamingListener(new StreamingListener {
+      override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) {
+        latch.countDown()
+      }
+    })
+
+    assert(latch.await(10, TimeUnit.SECONDS), "Timeout waiting for receiver to start.")
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org