You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by Bilna <gi...@git.apache.org> on 2014/12/30 14:12:34 UTC

[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

GitHub user Bilna opened a pull request:

    https://github.com/apache/spark/pull/3844

    [SPARK-4631] unit test for MQTT

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/Bilna/spark master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/3844.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3844
    
----
commit 86164950acfc794c6c9b1db3663716ac4626c55b
Author: bilna <bi...@am.amrita.edu>
Date:   2014-12-30T13:06:09Z

    [SPARK-4631] unit test for MQTT

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3844#discussion_r22397277
  
    --- Diff: external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala ---
    @@ -17,31 +17,65 @@
     
     package org.apache.spark.streaming.mqtt
     
    -import org.scalatest.FunSuite
    -
    +import org.scalatest.{BeforeAndAfter, FunSuite}
    +import org.scalatest.concurrent.Eventually
    +import scala.concurrent.duration._
     import org.apache.spark.streaming.{Seconds, StreamingContext}
     import org.apache.spark.storage.StorageLevel
     import org.apache.spark.streaming.dstream.ReceiverInputDStream
    +import org.eclipse.paho.client.mqttv3._
    +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
     
    -class MQTTStreamSuite extends FunSuite {
    -
    -  val batchDuration = Seconds(1)
    +class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
     
    +  private val batchDuration = Seconds(1)
       private val master: String = "local[2]"
    -
       private val framework: String = this.getClass.getSimpleName
    +  private val brokerUrl = "tcp://localhost:1883"
    +  private val topic = "def"
    +  private var ssc: StreamingContext = _
     
    -  test("mqtt input stream") {
    -    val ssc = new StreamingContext(master, framework, batchDuration)
    -    val brokerUrl = "abc"
    -    val topic = "def"
    +  before {
    +    ssc = new StreamingContext(master, framework, batchDuration)
    +  }
    +  after {
    +    if (ssc != null) {
    +      ssc.stop()
    +      ssc = null
    +    }
    +  }
     
    -    // tests the API, does not actually test data receiving
    -    val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic)
    -    val test2: ReceiverInputDStream[String] =
    +  test("mqtt input stream") {
    +    val sendMessage = "MQTT demo for spark streaming"
    +    publishData(sendMessage)
    +    val receiveStream: ReceiverInputDStream[String] =
           MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)
    -
    -    // TODO: Actually test receiving data
    +    var receiveMessage: String = ""
    +    receiveStream.foreachRDD { rdd =>
    +      receiveMessage = rdd.first
    +      receiveMessage
    +    }
    +    ssc.start()
    +    eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
    +      assert(sendMessage.equals(receiveMessage))
    +    }
         ssc.stop()
       }
    +
    +  def publishData(sendMessage: String): Unit = {
    +    try {
    +      val persistence: MqttClientPersistence = new MqttDefaultFilePersistence("/tmp")
    +      val client: MqttClient = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence)
    +      client.connect()
    +      val msgTopic: MqttTopic = client.getTopic(topic)
    +      val message: MqttMessage = new MqttMessage(String.valueOf(sendMessage).getBytes("utf-8"))
    +      message.setQos(1)
    +      message.setRetained(true)
    +      msgTopic.publish(message)
    +      println("Published data \ntopic: " + msgTopic.getName() + "\nMessage: " + message)
    +      client.disconnect()
    +    } catch {
    +      case e: MqttException => println("Exception Caught: " + e)
    +    }
    --- End diff --
    
    Shouldnt there be a `finally` to close any running servers (client, etc.)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3844#issuecomment-68589130
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25007/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3844#issuecomment-68427356
  
      [Test build #24949 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24949/consoleFull) for   PR 3844 at commit [`4b58094`](https://github.com/apache/spark/commit/4b580943de5137e947d1a6cdadd054020932ed8e).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by dragos <gi...@git.apache.org>.
Github user dragos commented on the pull request:

    https://github.com/apache/spark/pull/3844#issuecomment-71852913
  
    There also seems to be a race condition introduced by this test. It fails consistently for me (but passes if I add a `Thread.sleep(50)` inside `publishData`). I'll open a ticket.
    
    ```
    [info] - mqtt input stream *** FAILED *** (552 milliseconds)
    [info]   org.eclipse.paho.client.mqttv3.MqttException: Too many publishes in progress
    [info]   at org.eclipse.paho.client.mqttv3.internal.ClientState.send(ClientState.java:432)
    [info]   at org.eclipse.paho.client.mqttv3.internal.ClientComms.internalSend(ClientComms.java:121)
    [info]   at org.eclipse.paho.client.mqttv3.internal.ClientComms.sendNoWait(ClientComms.java:139)
    [info]   at org.eclipse.paho.client.mqttv3.MqttTopic.publish(MqttTopic.java:107)
    [info]   at org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$publishData$1.apply(MQTTStreamSuite.scala:126)
    [info]   at org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$publishData$1.apply(MQTTStreamSuite.scala:124)
    [info]   at scala.collection.immutable.Range.foreach(Range.scala:141)
    [info]   at org.apache.spark.streaming.mqtt.MQTTStreamSuite.publishData(MQTTStreamSuite.scala:124)
    [info]   at org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$3.apply$mcV$sp(MQTTStreamSuite.scala:78)
    [info]   at org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$3.apply(MQTTStreamSuite.scala:66)
    [info]   at org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$3.apply(MQTTStreamSuite.scala:66)
    [info]   at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
    [info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
    [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
    [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
    [info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
    [info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
    [info]   at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3844#issuecomment-68525514
  
      [Test build #24994 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24994/consoleFull) for   PR 3844 at commit [`04503cf`](https://github.com/apache/spark/commit/04503cfa7f8168038c17198b6e45b16b89591e74).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3844#issuecomment-68419761
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24922/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3844#discussion_r22372726
  
    --- Diff: external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala ---
    @@ -17,31 +17,58 @@
     
     package org.apache.spark.streaming.mqtt
     
    +import org.apache.spark.Logging
     import org.scalatest.FunSuite
    -
    +import org.scalatest.concurrent.Eventually
    +import scala.concurrent.duration._
     import org.apache.spark.streaming.{Seconds, StreamingContext}
     import org.apache.spark.storage.StorageLevel
     import org.apache.spark.streaming.dstream.ReceiverInputDStream
    +import org.eclipse.paho.client.mqttv3._
    +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
     
    -class MQTTStreamSuite extends FunSuite {
    +abstract class MQTTStreamSuiteBase extends FunSuite with Eventually with Logging {
     
       val batchDuration = Seconds(1)
    +  val master: String = "local[2]"
    +  val framework: String = this.getClass.getSimpleName
    +  val brokerUrl = "tcp://localhost:1883"
    +  val topic = "def"
    +  
    +  def publishData(sendMessage: String): Unit = {
    +    try {
    +      val persistence: MqttClientPersistence = new MqttDefaultFilePersistence("/tmp")
    +      val client: MqttClient = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence)
    +      client.connect()
    +      val msgTopic: MqttTopic = client.getTopic(topic)
    +      val message: MqttMessage = new MqttMessage(String.valueOf(sendMessage).getBytes("utf-8"))
    +      message.setQos(1)
    +      message.setRetained(true)
    +      msgTopic.publish(message)
    +      println("Published data \ntopic: " + msgTopic.getName() + "\nMessage: " + message)
    +      client.disconnect()
    +    } catch {
    +      case e: MqttException => println("Exception Caught: " + e)
    +    }
    +  }
    +}
     
    -  private val master: String = "local[2]"
    -
    -  private val framework: String = this.getClass.getSimpleName
    -
    +class MQTTStreamSuite extends MQTTStreamSuiteBase {
       test("mqtt input stream") {
         val ssc = new StreamingContext(master, framework, batchDuration)
    --- End diff --
    
    This can cause the SparkContext to be not shutdown if there is an exception in the unit test, causing a leaked SparkContext. Take a look at how it is done in the KafkaStreamSuite with BeforeAndAfter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by Bilna <gi...@git.apache.org>.
Github user Bilna commented on the pull request:

    https://github.com/apache/spark/pull/3844#issuecomment-69694551
  
    ok.. I will look into it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3844#discussion_r22372739
  
    --- Diff: external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala ---
    @@ -17,31 +17,58 @@
     
     package org.apache.spark.streaming.mqtt
     
    +import org.apache.spark.Logging
     import org.scalatest.FunSuite
    -
    +import org.scalatest.concurrent.Eventually
    +import scala.concurrent.duration._
     import org.apache.spark.streaming.{Seconds, StreamingContext}
     import org.apache.spark.storage.StorageLevel
     import org.apache.spark.streaming.dstream.ReceiverInputDStream
    +import org.eclipse.paho.client.mqttv3._
    +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
     
    -class MQTTStreamSuite extends FunSuite {
    +abstract class MQTTStreamSuiteBase extends FunSuite with Eventually with Logging {
    --- End diff --
    
    Why make this class if this is used by only one other class? Are you planning to another testsuite, which would justify this abstract class (like KafkaStreamSuiteBase is used by KafkaStreamSuite and ReliableKafkaStreamSuite)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3844#discussion_r22459212
  
    --- Diff: external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala ---
    @@ -17,31 +17,111 @@
     
     package org.apache.spark.streaming.mqtt
     
    -import org.scalatest.FunSuite
    +import java.net.{URI, ServerSocket}
     
    -import org.apache.spark.streaming.{Seconds, StreamingContext}
    +import org.apache.activemq.broker.{TransportConnector, BrokerService}
    +import org.apache.spark.util.Utils
    +import org.scalatest.{BeforeAndAfter, FunSuite}
    +import org.scalatest.concurrent.Eventually
    +import scala.concurrent.duration._
    +import org.apache.spark.streaming.{Milliseconds, StreamingContext}
     import org.apache.spark.storage.StorageLevel
     import org.apache.spark.streaming.dstream.ReceiverInputDStream
    +import org.eclipse.paho.client.mqttv3._
    +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
     
    -class MQTTStreamSuite extends FunSuite {
    -
    -  val batchDuration = Seconds(1)
    +class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
     
    +  private val batchDuration = Milliseconds(500)
       private val master: String = "local[2]"
    -
       private val framework: String = this.getClass.getSimpleName
    +  private val freePort = findFreePort()
    +  private val brokerUri = "//localhost:" + freePort
    +  private val topic = "def"
    +  private var ssc: StreamingContext = _
    +  private val persistenceDir = Utils.createTempDir()
    +  private var broker: BrokerService = _
    +  private var connector: TransportConnector = _
     
    -  test("mqtt input stream") {
    -    val ssc = new StreamingContext(master, framework, batchDuration)
    -    val brokerUrl = "abc"
    -    val topic = "def"
    +  before {
    +    ssc = new StreamingContext(master, framework, batchDuration)
    +    setupMQTT()
    +  }
     
    -    // tests the API, does not actually test data receiving
    -    val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic)
    -    val test2: ReceiverInputDStream[String] =
    -      MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)
    +  after {
    +    if (ssc != null) {
    +      ssc.stop()
    +      ssc = null
    +    }
    +    Utils.deleteRecursively(persistenceDir)
    +    tearDownMQTT()
    +  }
     
    -    // TODO: Actually test receiving data
    +  test("mqtt input stream") {
    +    val sendMessage = "MQTT demo for spark streaming"
    +    val receiveStream: ReceiverInputDStream[String] =
    +      MQTTUtils.createStream(ssc, "tcp:" + brokerUri, topic, StorageLevel.MEMORY_ONLY)
    +    var receiveMessage: List[String] = List()
    +    receiveStream.foreachRDD { rdd =>
    +      if (rdd.collect.length > 0) {
    +        receiveMessage = receiveMessage ::: List(rdd.first)
    +        receiveMessage
    +      }
    +    }
    +    ssc.start()
    +    publishData(sendMessage)
    +    eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
    +      assert(sendMessage.equals(receiveMessage(0)))
    +    }
         ssc.stop()
       }
    +
    +  private def setupMQTT() {
    +    broker = new BrokerService()
    +    connector = new TransportConnector()
    +    connector.setName("mqtt")
    +    connector.setUri(new URI("mqtt:" + brokerUri))
    +    broker.addConnector(connector)
    +    broker.start()
    +  }
    +
    +  private def tearDownMQTT() {
    +    if (broker != null) {
    +      broker.stop()
    +      broker = null
    +    }
    +    if (connector != null) {
    +      connector.stop()
    +      connector = null
    +    }
    +  }
    +
    +  private def findFreePort(): Int = {
    +    Utils.startServiceOnPort(23456, (trialPort: Int) => {
    +      val socket = new ServerSocket(trialPort)
    +      socket.close()
    +      (null, trialPort)
    +    })._2
    +  }
    +
    +  def publishData(data: String): Unit = {
    +    var client: MqttClient = null
    +    try {
    +      val persistence: MqttClientPersistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath)
    +      client = new MqttClient("tcp:" + brokerUri, MqttClient.generateClientId(), persistence)
    +      client.connect()
    +      if (client.isConnected) {
    +        val msgTopic: MqttTopic = client.getTopic(topic)
    +        val message: MqttMessage = new MqttMessage(data.getBytes("utf-8"))
    +        message.setQos(1)
    +        message.setRetained(true)
    +        for (i <- 0 to 100)
    +          msgTopic.publish(message)
    --- End diff --
    
    ```
    for (...) {
        msgTopic.publish(message)
    }
    ```
    
    Such code block should either be in one line or be within braces.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3844#issuecomment-68643164
  
      [Test build #25035 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25035/consoleFull) for   PR 3844 at commit [`acea3a3`](https://github.com/apache/spark/commit/acea3a31eba9d0853cb7484a16f8916219057be0).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3844#issuecomment-68429312
  
      [Test build #24949 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24949/consoleFull) for   PR 3844 at commit [`4b58094`](https://github.com/apache/spark/commit/4b580943de5137e947d1a6cdadd054020932ed8e).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3844#issuecomment-68429316
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24949/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/3844#issuecomment-68581068
  
    This is almost looking good. few more comments and we are ready. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/3844#issuecomment-68616859
  
    Only one more comment. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3844#issuecomment-68428989
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24948/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by dragos <gi...@git.apache.org>.
Github user dragos commented on the pull request:

    https://github.com/apache/spark/pull/3844#issuecomment-71858250
  
    @srowen I commented on the ticket, but I can't re-open it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the pull request:

    https://github.com/apache/spark/pull/3844#issuecomment-71854715
  
    @dragos good catch. It sounds like an issue with the test if anything. You could just reopen SPARK-4631 with a workaround.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3844#issuecomment-68525682
  
      [Test build #24996 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24996/consoleFull) for   PR 3844 at commit [`4b34ee7`](https://github.com/apache/spark/commit/4b34ee784e7c9c489cf0c22d73311c160bc67c47).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/3844


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3844#issuecomment-68646513
  
      [Test build #25035 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25035/consoleFull) for   PR 3844 at commit [`acea3a3`](https://github.com/apache/spark/commit/acea3a31eba9d0853cb7484a16f8916219057be0).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by Bilna <gi...@git.apache.org>.
Github user Bilna commented on the pull request:

    https://github.com/apache/spark/pull/3844#issuecomment-68686800
  
    @tdas, Thanks ....



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3844#issuecomment-68426613
  
      [Test build #24946 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24946/consoleFull) for   PR 3844 at commit [`5f6bfd2`](https://github.com/apache/spark/commit/5f6bfd2f4b11c08e76d02ccf5a5594151ccd9af5).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3844#issuecomment-68419759
  
      [Test build #24922 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24922/consoleFull) for   PR 3844 at commit [`e8b6623`](https://github.com/apache/spark/commit/e8b6623e5bd31fcb583fdeae5f1c954be672403d).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `abstract class MQTTStreamSuiteBase extends FunSuite with Eventually with Logging `
      * `class MQTTStreamSuite extends MQTTStreamSuiteBase `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3844#issuecomment-68428438
  
      [Test build #24946 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24946/consoleFull) for   PR 3844 at commit [`5f6bfd2`](https://github.com/apache/spark/commit/5f6bfd2f4b11c08e76d02ccf5a5594151ccd9af5).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3844#issuecomment-68355440
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3844#issuecomment-68428985
  
      [Test build #24948 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24948/consoleFull) for   PR 3844 at commit [`b1ac4ad`](https://github.com/apache/spark/commit/b1ac4ad62ff6d537f669699d5da49bc4ee1ab154).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3844#discussion_r22446708
  
    --- Diff: external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala ---
    @@ -17,31 +17,111 @@
     
     package org.apache.spark.streaming.mqtt
     
    -import org.scalatest.FunSuite
    +import java.net.{URI, ServerSocket}
     
    -import org.apache.spark.streaming.{Seconds, StreamingContext}
    +import org.apache.activemq.broker.{TransportConnector, BrokerService}
    +import org.apache.spark.util.Utils
    +import org.scalatest.{BeforeAndAfter, FunSuite}
    +import org.scalatest.concurrent.Eventually
    +import scala.concurrent.duration._
    +import org.apache.spark.streaming.{Milliseconds, StreamingContext}
     import org.apache.spark.storage.StorageLevel
     import org.apache.spark.streaming.dstream.ReceiverInputDStream
    +import org.eclipse.paho.client.mqttv3._
    +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
     
    -class MQTTStreamSuite extends FunSuite {
    -
    -  val batchDuration = Seconds(1)
    +class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
     
    +  private val batchDuration = Milliseconds(500)
       private val master: String = "local[2]"
    -
       private val framework: String = this.getClass.getSimpleName
    +  private val freePort = findFreePort()
    +  private val brokerUri = "//localhost:" + freePort
    +  private val topic = "def"
    +  private var ssc: StreamingContext = _
    +  private val persistenceDir = Utils.createTempDir()
    +  private var broker: BrokerService = _
    +  private var connector: TransportConnector = _
     
    -  test("mqtt input stream") {
    -    val ssc = new StreamingContext(master, framework, batchDuration)
    -    val brokerUrl = "abc"
    -    val topic = "def"
    +  before {
    +    ssc = new StreamingContext(master, framework, batchDuration)
    +    setupMQTT()
    +  }
     
    -    // tests the API, does not actually test data receiving
    -    val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic)
    -    val test2: ReceiverInputDStream[String] =
    -      MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)
    +  after {
    +    if (ssc != null) {
    +      ssc.stop()
    +      ssc = null
    +    }
    +    Utils.deleteRecursively(persistenceDir)
    +    tearDownMQTT()
    +  }
     
    -    // TODO: Actually test receiving data
    +  test("mqtt input stream") {
    +    val sendMessage = "MQTT demo for spark streaming"
    +    val receiveStream: ReceiverInputDStream[String] =
    +      MQTTUtils.createStream(ssc, "tcp:" + brokerUri, topic, StorageLevel.MEMORY_ONLY)
    +    var receiveMessage: List[String] = List()
    +    receiveStream.foreachRDD { rdd =>
    +      if (rdd.collect.length > 0) {
    +        receiveMessage = receiveMessage ::: List(rdd.first)
    +        receiveMessage
    +      }
    +    }
    +    ssc.start()
    +    publishData(sendMessage)
    +    eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
    +      assert(sendMessage.equals(receiveMessage(0)))
    +    }
         ssc.stop()
       }
    +
    +  private def setupMQTT() {
    +    broker = new BrokerService()
    +    connector = new TransportConnector()
    +    connector.setName("mqtt")
    +    connector.setUri(new URI("mqtt:" + brokerUri))
    +    broker.addConnector(connector)
    +    broker.start()
    +  }
    +
    +  private def tearDownMQTT() {
    +    if (broker != null) {
    +      broker.stop()
    +      broker = null
    +    }
    +    if (connector != null) {
    +      connector.stop()
    +      connector = null
    +    }
    +  }
    +
    +  private def findFreePort(): Int = {
    +    Utils.startServiceOnPort(23456, (trialPort: Int) => {
    +      val socket = new ServerSocket(trialPort)
    +      socket.close()
    +      (null, trialPort)
    +    })._2
    +  }
    +
    +  def publishData(data: String): Unit = {
    +    var client: MqttClient = null
    +    try {
    +      val persistence: MqttClientPersistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath)
    +      client = new MqttClient("tcp:" + brokerUri, MqttClient.generateClientId(), persistence)
    +      client.connect()
    +      if (client.isConnected) {
    +        val msgTopic: MqttTopic = client.getTopic(topic)
    +        val message: MqttMessage = new MqttMessage(data.getBytes("utf-8"))
    +        message.setQos(1)
    +        message.setRetained(true)
    +        for (i <- 0 to 100)
    +          msgTopic.publish(message)
    --- End diff --
    
    nit: I missed this in the last pass, but this violates the Scala syntax that we follow. I wont block this PR for this. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3844#issuecomment-68587233
  
      [Test build #25007 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25007/consoleFull) for   PR 3844 at commit [`fac3904`](https://github.com/apache/spark/commit/fac3904a8e702722acca2a0e7217c5440ecda84a).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3844#discussion_r22427587
  
    --- Diff: external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala ---
    @@ -17,31 +17,114 @@
     
     package org.apache.spark.streaming.mqtt
     
    -import org.scalatest.FunSuite
    +import java.net.{URI, ServerSocket}
     
    -import org.apache.spark.streaming.{Seconds, StreamingContext}
    +import org.apache.activemq.broker.{TransportConnector, BrokerService}
    +import org.apache.spark.util.Utils
    +import org.scalatest.{BeforeAndAfter, FunSuite}
    +import org.scalatest.concurrent.Eventually
    +import scala.concurrent.duration._
    +import org.apache.spark.streaming.{Milliseconds, StreamingContext}
     import org.apache.spark.storage.StorageLevel
     import org.apache.spark.streaming.dstream.ReceiverInputDStream
    +import org.eclipse.paho.client.mqttv3._
    +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
     
    -class MQTTStreamSuite extends FunSuite {
    -
    -  val batchDuration = Seconds(1)
    +class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
     
    +  private val batchDuration = Milliseconds(500)
       private val master: String = "local[2]"
    -
       private val framework: String = this.getClass.getSimpleName
    +  private val freePort = findFreePort()
    +  private val brokerUri = "//localhost:" + freePort
    +  private val topic = "def"
    +  private var ssc: StreamingContext = _
    +  private val persistenceDir = Utils.createTempDir()
    +  private var broker: BrokerService = _
    +  private var connector: TransportConnector = _
     
    -  test("mqtt input stream") {
    -    val ssc = new StreamingContext(master, framework, batchDuration)
    -    val brokerUrl = "abc"
    -    val topic = "def"
    +  before {
    +    ssc = new StreamingContext(master, framework, batchDuration)
    +    setupMQTT
    --- End diff --
    
    nit: this should be `setupMQTT()`. See Scala style guide - http://docs.scala-lang.org/style/method-invocation.html#arity0 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3844#issuecomment-68428440
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24946/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by dragos <gi...@git.apache.org>.
Github user dragos commented on the pull request:

    https://github.com/apache/spark/pull/3844#issuecomment-72061693
  
    See #4270 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/3844#issuecomment-68665022
  
    Merging this, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3844#issuecomment-68525686
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24996/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3844#discussion_r22397269
  
    --- Diff: external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala ---
    @@ -17,31 +17,65 @@
     
     package org.apache.spark.streaming.mqtt
     
    -import org.scalatest.FunSuite
    -
    +import org.scalatest.{BeforeAndAfter, FunSuite}
    +import org.scalatest.concurrent.Eventually
    +import scala.concurrent.duration._
     import org.apache.spark.streaming.{Seconds, StreamingContext}
     import org.apache.spark.storage.StorageLevel
     import org.apache.spark.streaming.dstream.ReceiverInputDStream
    +import org.eclipse.paho.client.mqttv3._
    +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
     
    -class MQTTStreamSuite extends FunSuite {
    -
    -  val batchDuration = Seconds(1)
    +class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
     
    +  private val batchDuration = Seconds(1)
       private val master: String = "local[2]"
    -
       private val framework: String = this.getClass.getSimpleName
    +  private val brokerUrl = "tcp://localhost:1883"
    --- End diff --
    
    Who is running the broker? Also this port is hardcoded. There is a small, non-trivial chance that this port may not be free (in Jenkins, where multiple series of test maybe running in parallel) causing the server to not bind thus failing test. Can you find a free port (see [FlumeStreamSuite](https://github.com/apache/spark/blob/master/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala#L78)) and use that instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by Bilna <gi...@git.apache.org>.
Github user Bilna commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3844#discussion_r22405599
  
    --- Diff: external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala ---
    @@ -17,31 +17,65 @@
     
     package org.apache.spark.streaming.mqtt
     
    -import org.scalatest.FunSuite
    -
    +import org.scalatest.{BeforeAndAfter, FunSuite}
    +import org.scalatest.concurrent.Eventually
    +import scala.concurrent.duration._
     import org.apache.spark.streaming.{Seconds, StreamingContext}
     import org.apache.spark.storage.StorageLevel
     import org.apache.spark.streaming.dstream.ReceiverInputDStream
    +import org.eclipse.paho.client.mqttv3._
    +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
     
    -class MQTTStreamSuite extends FunSuite {
    -
    -  val batchDuration = Seconds(1)
    +class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
     
    +  private val batchDuration = Seconds(1)
       private val master: String = "local[2]"
    -
       private val framework: String = this.getClass.getSimpleName
    +  private val brokerUrl = "tcp://localhost:1883"
    --- End diff --
    
    TCP/IP port 1883 is reserved with IANA for use with MQTT. That is why I hardcoded it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3844#discussion_r22427535
  
    --- Diff: external/mqtt/pom.xml ---
    @@ -16,65 +16,71 @@
       ~ limitations under the License.
       -->
     
    --- End diff --
    
    This file should not change completely like this. I think there has been some incorrect changes in the indentation. Please revert/fix this. If there are any changes necessary related to test scope (for unit test) only those lines should change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3844#issuecomment-68646517
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25035/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3844#issuecomment-68589126
  
      [Test build #25007 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25007/consoleFull) for   PR 3844 at commit [`fac3904`](https://github.com/apache/spark/commit/fac3904a8e702722acca2a0e7217c5440ecda84a).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3844#issuecomment-68427068
  
      [Test build #24948 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24948/consoleFull) for   PR 3844 at commit [`b1ac4ad`](https://github.com/apache/spark/commit/b1ac4ad62ff6d537f669699d5da49bc4ee1ab154).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3844#issuecomment-68435866
  
      [Test build #24956 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24956/consoleFull) for   PR 3844 at commit [`fc8eb28`](https://github.com/apache/spark/commit/fc8eb286db6aa8e78a567537996011f554eed969).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3844#issuecomment-68433088
  
      [Test build #24956 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24956/consoleFull) for   PR 3844 at commit [`fc8eb28`](https://github.com/apache/spark/commit/fc8eb286db6aa8e78a567537996011f554eed969).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3844#issuecomment-68417591
  
      [Test build #24922 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24922/consoleFull) for   PR 3844 at commit [`e8b6623`](https://github.com/apache/spark/commit/e8b6623e5bd31fcb583fdeae5f1c954be672403d).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by Bilna <gi...@git.apache.org>.
Github user Bilna commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3844#discussion_r22453522
  
    --- Diff: external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala ---
    @@ -17,31 +17,111 @@
     
     package org.apache.spark.streaming.mqtt
     
    -import org.scalatest.FunSuite
    +import java.net.{URI, ServerSocket}
     
    -import org.apache.spark.streaming.{Seconds, StreamingContext}
    +import org.apache.activemq.broker.{TransportConnector, BrokerService}
    +import org.apache.spark.util.Utils
    +import org.scalatest.{BeforeAndAfter, FunSuite}
    +import org.scalatest.concurrent.Eventually
    +import scala.concurrent.duration._
    +import org.apache.spark.streaming.{Milliseconds, StreamingContext}
     import org.apache.spark.storage.StorageLevel
     import org.apache.spark.streaming.dstream.ReceiverInputDStream
    +import org.eclipse.paho.client.mqttv3._
    +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
     
    -class MQTTStreamSuite extends FunSuite {
    -
    -  val batchDuration = Seconds(1)
    +class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
     
    +  private val batchDuration = Milliseconds(500)
       private val master: String = "local[2]"
    -
       private val framework: String = this.getClass.getSimpleName
    +  private val freePort = findFreePort()
    +  private val brokerUri = "//localhost:" + freePort
    +  private val topic = "def"
    +  private var ssc: StreamingContext = _
    +  private val persistenceDir = Utils.createTempDir()
    +  private var broker: BrokerService = _
    +  private var connector: TransportConnector = _
     
    -  test("mqtt input stream") {
    -    val ssc = new StreamingContext(master, framework, batchDuration)
    -    val brokerUrl = "abc"
    -    val topic = "def"
    +  before {
    +    ssc = new StreamingContext(master, framework, batchDuration)
    +    setupMQTT()
    +  }
     
    -    // tests the API, does not actually test data receiving
    -    val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic)
    -    val test2: ReceiverInputDStream[String] =
    -      MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)
    +  after {
    +    if (ssc != null) {
    +      ssc.stop()
    +      ssc = null
    +    }
    +    Utils.deleteRecursively(persistenceDir)
    +    tearDownMQTT()
    +  }
     
    -    // TODO: Actually test receiving data
    +  test("mqtt input stream") {
    +    val sendMessage = "MQTT demo for spark streaming"
    +    val receiveStream: ReceiverInputDStream[String] =
    +      MQTTUtils.createStream(ssc, "tcp:" + brokerUri, topic, StorageLevel.MEMORY_ONLY)
    +    var receiveMessage: List[String] = List()
    +    receiveStream.foreachRDD { rdd =>
    +      if (rdd.collect.length > 0) {
    +        receiveMessage = receiveMessage ::: List(rdd.first)
    +        receiveMessage
    +      }
    +    }
    +    ssc.start()
    +    publishData(sendMessage)
    +    eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
    +      assert(sendMessage.equals(receiveMessage(0)))
    +    }
         ssc.stop()
       }
    +
    +  private def setupMQTT() {
    +    broker = new BrokerService()
    +    connector = new TransportConnector()
    +    connector.setName("mqtt")
    +    connector.setUri(new URI("mqtt:" + brokerUri))
    +    broker.addConnector(connector)
    +    broker.start()
    +  }
    +
    +  private def tearDownMQTT() {
    +    if (broker != null) {
    +      broker.stop()
    +      broker = null
    +    }
    +    if (connector != null) {
    +      connector.stop()
    +      connector = null
    +    }
    +  }
    +
    +  private def findFreePort(): Int = {
    +    Utils.startServiceOnPort(23456, (trialPort: Int) => {
    +      val socket = new ServerSocket(trialPort)
    +      socket.close()
    +      (null, trialPort)
    +    })._2
    +  }
    +
    +  def publishData(data: String): Unit = {
    +    var client: MqttClient = null
    +    try {
    +      val persistence: MqttClientPersistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath)
    +      client = new MqttClient("tcp:" + brokerUri, MqttClient.generateClientId(), persistence)
    +      client.connect()
    +      if (client.isConnected) {
    +        val msgTopic: MqttTopic = client.getTopic(topic)
    +        val message: MqttMessage = new MqttMessage(data.getBytes("utf-8"))
    +        message.setQos(1)
    +        message.setRetained(true)
    +        for (i <- 0 to 100)
    +          msgTopic.publish(message)
    --- End diff --
    
    Can you explain what is the correction here. Just to understand what went wrong.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/3844#issuecomment-68417426
  
    Jenksin, this is ok to test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3844#issuecomment-68525516
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24994/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3844#discussion_r22427641
  
    --- Diff: external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala ---
    @@ -17,31 +17,114 @@
     
     package org.apache.spark.streaming.mqtt
     
    -import org.scalatest.FunSuite
    +import java.net.{URI, ServerSocket}
     
    -import org.apache.spark.streaming.{Seconds, StreamingContext}
    +import org.apache.activemq.broker.{TransportConnector, BrokerService}
    +import org.apache.spark.util.Utils
    +import org.scalatest.{BeforeAndAfter, FunSuite}
    +import org.scalatest.concurrent.Eventually
    +import scala.concurrent.duration._
    +import org.apache.spark.streaming.{Milliseconds, StreamingContext}
     import org.apache.spark.storage.StorageLevel
     import org.apache.spark.streaming.dstream.ReceiverInputDStream
    +import org.eclipse.paho.client.mqttv3._
    +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
     
    -class MQTTStreamSuite extends FunSuite {
    -
    -  val batchDuration = Seconds(1)
    +class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
     
    +  private val batchDuration = Milliseconds(500)
       private val master: String = "local[2]"
    -
       private val framework: String = this.getClass.getSimpleName
    +  private val freePort = findFreePort()
    +  private val brokerUri = "//localhost:" + freePort
    +  private val topic = "def"
    +  private var ssc: StreamingContext = _
    +  private val persistenceDir = Utils.createTempDir()
    +  private var broker: BrokerService = _
    +  private var connector: TransportConnector = _
     
    -  test("mqtt input stream") {
    -    val ssc = new StreamingContext(master, framework, batchDuration)
    -    val brokerUrl = "abc"
    -    val topic = "def"
    +  before {
    +    ssc = new StreamingContext(master, framework, batchDuration)
    +    setupMQTT
    +  }
     
    -    // tests the API, does not actually test data receiving
    -    val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic)
    -    val test2: ReceiverInputDStream[String] =
    -      MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)
    +  after {
    +    if (ssc != null) {
    +      ssc.stop()
    +      ssc = null
    +    }
    +    Utils.deleteRecursively(persistenceDir)
    +    tearDownMQTT
    +  }
     
    -    // TODO: Actually test receiving data
    +  test("mqtt input stream") {
    +    val sendMessage = "MQTT demo for spark streaming"
    +    val receiveStream: ReceiverInputDStream[String] =
    +      MQTTUtils.createStream(ssc, "tcp:" + brokerUri, topic, StorageLevel.MEMORY_ONLY)
    +    var receiveMessage: List[String] = List()
    +    receiveStream.foreachRDD { rdd =>
    +      if (rdd.collect.length > 0) {
    +        receiveMessage = receiveMessage ::: List(rdd.first)
    +        receiveMessage
    +      }
    +    }
    +    ssc.start()
    +    publishData(sendMessage)
    +    eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
    +      assert(sendMessage.equals(receiveMessage(0)))
    +    }
         ssc.stop()
       }
    +
    +  private def setupMQTT() {
    +    broker = new BrokerService()
    +    connector = new TransportConnector()
    +    connector.setName("mqtt")
    +    connector.setUri(new URI("mqtt:" + brokerUri))
    +    broker.addConnector(connector)
    +    broker.start()
    +  }
    +
    +  private def tearDownMQTT() {
    +    if (broker != null) {
    +      broker.stop()
    +      broker = null
    +    }
    +    if (connector != null) {
    +      connector.stop()
    +      connector = null
    +    }
    +  }
    +
    +  private def findFreePort(): Int = {
    +    Utils.startServiceOnPort(23456, (trialPort: Int) => {
    +      val socket = new ServerSocket(trialPort)
    +      socket.close()
    +      (null, trialPort)
    +    })._2
    +  }
    +
    +  def publishData(data: String): Unit = {
    +    var client: MqttClient = null
    +    try {
    +      val persistence: MqttClientPersistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath)
    +      client = new MqttClient("tcp:" + brokerUri, MqttClient.generateClientId(), persistence)
    +      client.connect()
    +      if (client.isConnected) {
    +        val msgTopic: MqttTopic = client.getTopic(topic)
    +        val message: MqttMessage = new MqttMessage(data.getBytes("utf-8"))
    +        message.setQos(1)
    +        message.setRetained(true)
    +        for (i <- 0 to 10)
    +          msgTopic.publish(message)
    +      }
    +    } catch {
    +      case e: MqttException => println("Exception Caught: " + e)
    +    }
    --- End diff --
    
    nit: finally should be previous line after }


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3844#discussion_r22427589
  
    --- Diff: external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala ---
    @@ -17,31 +17,114 @@
     
     package org.apache.spark.streaming.mqtt
     
    -import org.scalatest.FunSuite
    +import java.net.{URI, ServerSocket}
     
    -import org.apache.spark.streaming.{Seconds, StreamingContext}
    +import org.apache.activemq.broker.{TransportConnector, BrokerService}
    +import org.apache.spark.util.Utils
    +import org.scalatest.{BeforeAndAfter, FunSuite}
    +import org.scalatest.concurrent.Eventually
    +import scala.concurrent.duration._
    +import org.apache.spark.streaming.{Milliseconds, StreamingContext}
     import org.apache.spark.storage.StorageLevel
     import org.apache.spark.streaming.dstream.ReceiverInputDStream
    +import org.eclipse.paho.client.mqttv3._
    +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
     
    -class MQTTStreamSuite extends FunSuite {
    -
    -  val batchDuration = Seconds(1)
    +class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
     
    +  private val batchDuration = Milliseconds(500)
       private val master: String = "local[2]"
    -
       private val framework: String = this.getClass.getSimpleName
    +  private val freePort = findFreePort()
    +  private val brokerUri = "//localhost:" + freePort
    +  private val topic = "def"
    +  private var ssc: StreamingContext = _
    +  private val persistenceDir = Utils.createTempDir()
    +  private var broker: BrokerService = _
    +  private var connector: TransportConnector = _
     
    -  test("mqtt input stream") {
    -    val ssc = new StreamingContext(master, framework, batchDuration)
    -    val brokerUrl = "abc"
    -    val topic = "def"
    +  before {
    +    ssc = new StreamingContext(master, framework, batchDuration)
    +    setupMQTT
    +  }
     
    -    // tests the API, does not actually test data receiving
    -    val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic)
    -    val test2: ReceiverInputDStream[String] =
    -      MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)
    +  after {
    +    if (ssc != null) {
    +      ssc.stop()
    +      ssc = null
    +    }
    +    Utils.deleteRecursively(persistenceDir)
    +    tearDownMQTT
    --- End diff --
    
    Same comment as for `setupMQTT`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by prabeesh <gi...@git.apache.org>.
Github user prabeesh commented on the pull request:

    https://github.com/apache/spark/pull/3844#issuecomment-68355645
  
    @tdas verify this patch


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/3844#issuecomment-69675455
  
    It looks like there's maybe a port-binding / racing issue here?
    
    https://amplab.cs.berkeley.edu/jenkins/job/Spark-Master-SBT/AMPLAB_JENKINS_BUILD_PROFILE=hadoop1.0,label=centos/1356/testReport/
    
    ```
    sbt.ForkMain$ForkError: Transport Connector could not be registered in JMX: Failed to bind to server socket: mqtt://localhost:23456 due to: java.net.BindException: Address already in use
    	at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:27)
    	at org.apache.activemq.broker.BrokerService.registerConnectorMBean(BrokerService.java:1977)
    	at org.apache.activemq.broker.BrokerService.startTransportConnector(BrokerService.java:2468)
    	at org.apache.activemq.broker.BrokerService.startAllConnectors(BrokerService.java:2385)
    	at org.apache.activemq.broker.BrokerService.doStartBroker(BrokerService.java:684)
    	at org.apache.activemq.broker.BrokerService.startBroker(BrokerService.java:642)
    	at org.apache.activemq.broker.BrokerService.start(BrokerService.java:578)
    	at org.apache.spark.streaming.mqtt.MQTTStreamSuite.org$apache$spark$streaming$mqtt$MQTTStreamSuite$$setupMQTT(MQTTStreamSuite.scala:90)
    	at org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$1.apply$mcV$sp(MQTTStreamSuite.scala:53)
    	at org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$1.apply(MQTTStreamSuite.scala:51)
    	at org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$1.apply(MQTTStreamSuite.scala:51)
    	at org.scalatest.BeforeAndAfter$class.runTest(BeforeAndAfter.scala:195)
    	at org.apache.spark.streaming.mqtt.MQTTStreamSuite.runTest(MQTTStreamSuite.scala:37)
    	at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
    	at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
    	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
    	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
    	at scala.collection.immutable.List.foreach(List.scala:318)
    	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
    	at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
    	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
    	at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208)
    	at org.scalatest.FunSuite.runTests(FunSuite.scala:1555)
    	at org.scalatest.Suite$class.run(Suite.scala:1424)
    	at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555)
    	at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
    	at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
    	at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
    	at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212)
    	at org.apache.spark.streaming.mqtt.MQTTStreamSuite.org$scalatest$BeforeAndAfter$$super$run(MQTTStreamSuite.scala:37)
    	at org.scalatest.BeforeAndAfter$class.run(BeforeAndAfter.scala:241)
    	at org.apache.spark.streaming.mqtt.MQTTStreamSuite.run(MQTTStreamSuite.scala:37)
    	at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:462)
    	at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:671)
    	at sbt.ForkMain$Run$2.call(ForkMain.java:294)
    	at sbt.ForkMain$Run$2.call(ForkMain.java:284)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    	at java.lang.Thread.run(Thread.java:745)
    Caused by: sbt.ForkMain$ForkError: Failed to bind to server socket: mqtt://localhost:23456 due to: java.net.BindException: Address already in use
    	at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:33)
    	at org.apache.activemq.transport.tcp.TcpTransportServer.bind(TcpTransportServer.java:138)
    	at org.apache.activemq.transport.tcp.TcpTransportFactory.doBind(TcpTransportFactory.java:60)
    	at org.apache.activemq.transport.TransportFactory.bind(TransportFactory.java:124)
    	at org.apache.activemq.broker.TransportConnector.createTransportServer(TransportConnector.java:310)
    	at org.apache.activemq.broker.TransportConnector.getServer(TransportConnector.java:136)
    	at org.apache.activemq.broker.TransportConnector.asManagedConnector(TransportConnector.java:105)
    	at org.apache.activemq.broker.BrokerService.registerConnectorMBean(BrokerService.java:1972)
    	... 38 more
    Caused by: sbt.ForkMain$ForkError: Address already in use
    	at java.net.PlainSocketImpl.socketBind(Native Method)
    	at java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:376)
    	at java.net.ServerSocket.bind(ServerSocket.java:376)
    	at java.net.ServerSocket.<init>(ServerSocket.java:237)
    	at javax.net.DefaultServerSocketFactory.createServerSocket(ServerSocketFactory.java:231)
    	at org.apache.activemq.transport.tcp.TcpTransportServer.bind(TcpTransportServer.java:134)
    	... 44 more
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3844#issuecomment-68435869
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24956/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3844#discussion_r22435908
  
    --- Diff: external/mqtt/pom.xml ---
    @@ -47,6 +47,11 @@
           <version>1.0.1</version>
         </dependency>
         <dependency>
    +      <groupId>org.apache.activemq</groupId>
    +      <artifactId>activemq-core</artifactId>
    +      <version>5.7.0</version>
    +    </dependency>
    --- End diff --
    
    Ummm.. is this dependency necessary only for unit test? In that case it should be added to test scope (See scalatest below). We really try to avoid changing dependencies as any such change can cause conflicts with other stuff (spark's code dependencies) causing unforeseen failures. So if this is only necessary for test, please put it in test scope. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by Bilna <gi...@git.apache.org>.
Github user Bilna commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3844#discussion_r22507984
  
    --- Diff: external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala ---
    @@ -17,31 +17,111 @@
     
     package org.apache.spark.streaming.mqtt
     
    -import org.scalatest.FunSuite
    +import java.net.{URI, ServerSocket}
     
    -import org.apache.spark.streaming.{Seconds, StreamingContext}
    +import org.apache.activemq.broker.{TransportConnector, BrokerService}
    +import org.apache.spark.util.Utils
    +import org.scalatest.{BeforeAndAfter, FunSuite}
    +import org.scalatest.concurrent.Eventually
    +import scala.concurrent.duration._
    +import org.apache.spark.streaming.{Milliseconds, StreamingContext}
     import org.apache.spark.storage.StorageLevel
     import org.apache.spark.streaming.dstream.ReceiverInputDStream
    +import org.eclipse.paho.client.mqttv3._
    +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
     
    -class MQTTStreamSuite extends FunSuite {
    -
    -  val batchDuration = Seconds(1)
    +class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
     
    +  private val batchDuration = Milliseconds(500)
       private val master: String = "local[2]"
    -
       private val framework: String = this.getClass.getSimpleName
    +  private val freePort = findFreePort()
    +  private val brokerUri = "//localhost:" + freePort
    +  private val topic = "def"
    +  private var ssc: StreamingContext = _
    +  private val persistenceDir = Utils.createTempDir()
    +  private var broker: BrokerService = _
    +  private var connector: TransportConnector = _
     
    -  test("mqtt input stream") {
    -    val ssc = new StreamingContext(master, framework, batchDuration)
    -    val brokerUrl = "abc"
    -    val topic = "def"
    +  before {
    +    ssc = new StreamingContext(master, framework, batchDuration)
    +    setupMQTT()
    +  }
     
    -    // tests the API, does not actually test data receiving
    -    val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic)
    -    val test2: ReceiverInputDStream[String] =
    -      MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)
    +  after {
    +    if (ssc != null) {
    +      ssc.stop()
    +      ssc = null
    +    }
    +    Utils.deleteRecursively(persistenceDir)
    +    tearDownMQTT()
    +  }
     
    -    // TODO: Actually test receiving data
    +  test("mqtt input stream") {
    +    val sendMessage = "MQTT demo for spark streaming"
    +    val receiveStream: ReceiverInputDStream[String] =
    +      MQTTUtils.createStream(ssc, "tcp:" + brokerUri, topic, StorageLevel.MEMORY_ONLY)
    +    var receiveMessage: List[String] = List()
    +    receiveStream.foreachRDD { rdd =>
    +      if (rdd.collect.length > 0) {
    +        receiveMessage = receiveMessage ::: List(rdd.first)
    +        receiveMessage
    +      }
    +    }
    +    ssc.start()
    +    publishData(sendMessage)
    +    eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
    +      assert(sendMessage.equals(receiveMessage(0)))
    +    }
         ssc.stop()
       }
    +
    +  private def setupMQTT() {
    +    broker = new BrokerService()
    +    connector = new TransportConnector()
    +    connector.setName("mqtt")
    +    connector.setUri(new URI("mqtt:" + brokerUri))
    +    broker.addConnector(connector)
    +    broker.start()
    +  }
    +
    +  private def tearDownMQTT() {
    +    if (broker != null) {
    +      broker.stop()
    +      broker = null
    +    }
    +    if (connector != null) {
    +      connector.stop()
    +      connector = null
    +    }
    +  }
    +
    +  private def findFreePort(): Int = {
    +    Utils.startServiceOnPort(23456, (trialPort: Int) => {
    +      val socket = new ServerSocket(trialPort)
    +      socket.close()
    +      (null, trialPort)
    +    })._2
    +  }
    +
    +  def publishData(data: String): Unit = {
    +    var client: MqttClient = null
    +    try {
    +      val persistence: MqttClientPersistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath)
    +      client = new MqttClient("tcp:" + brokerUri, MqttClient.generateClientId(), persistence)
    +      client.connect()
    +      if (client.isConnected) {
    +        val msgTopic: MqttTopic = client.getTopic(topic)
    +        val message: MqttMessage = new MqttMessage(data.getBytes("utf-8"))
    +        message.setQos(1)
    +        message.setRetained(true)
    +        for (i <- 0 to 100)
    +          msgTopic.publish(message)
    --- End diff --
    
    ok.. thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3844#discussion_r22429050
  
    --- Diff: external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala ---
    @@ -17,31 +17,114 @@
     
     package org.apache.spark.streaming.mqtt
     
    -import org.scalatest.FunSuite
    +import java.net.{URI, ServerSocket}
     
    -import org.apache.spark.streaming.{Seconds, StreamingContext}
    +import org.apache.activemq.broker.{TransportConnector, BrokerService}
    +import org.apache.spark.util.Utils
    +import org.scalatest.{BeforeAndAfter, FunSuite}
    +import org.scalatest.concurrent.Eventually
    +import scala.concurrent.duration._
    +import org.apache.spark.streaming.{Milliseconds, StreamingContext}
     import org.apache.spark.storage.StorageLevel
     import org.apache.spark.streaming.dstream.ReceiverInputDStream
    +import org.eclipse.paho.client.mqttv3._
    +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
     
    -class MQTTStreamSuite extends FunSuite {
    -
    -  val batchDuration = Seconds(1)
    +class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
     
    +  private val batchDuration = Milliseconds(500)
       private val master: String = "local[2]"
    -
       private val framework: String = this.getClass.getSimpleName
    +  private val freePort = findFreePort()
    +  private val brokerUri = "//localhost:" + freePort
    +  private val topic = "def"
    +  private var ssc: StreamingContext = _
    +  private val persistenceDir = Utils.createTempDir()
    +  private var broker: BrokerService = _
    +  private var connector: TransportConnector = _
     
    -  test("mqtt input stream") {
    -    val ssc = new StreamingContext(master, framework, batchDuration)
    -    val brokerUrl = "abc"
    -    val topic = "def"
    +  before {
    +    ssc = new StreamingContext(master, framework, batchDuration)
    +    setupMQTT
    +  }
     
    -    // tests the API, does not actually test data receiving
    -    val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic)
    -    val test2: ReceiverInputDStream[String] =
    -      MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)
    +  after {
    +    if (ssc != null) {
    +      ssc.stop()
    +      ssc = null
    +    }
    +    Utils.deleteRecursively(persistenceDir)
    +    tearDownMQTT
    +  }
     
    -    // TODO: Actually test receiving data
    +  test("mqtt input stream") {
    +    val sendMessage = "MQTT demo for spark streaming"
    +    val receiveStream: ReceiverInputDStream[String] =
    +      MQTTUtils.createStream(ssc, "tcp:" + brokerUri, topic, StorageLevel.MEMORY_ONLY)
    +    var receiveMessage: List[String] = List()
    +    receiveStream.foreachRDD { rdd =>
    +      if (rdd.collect.length > 0) {
    +        receiveMessage = receiveMessage ::: List(rdd.first)
    +        receiveMessage
    +      }
    +    }
    +    ssc.start()
    +    publishData(sendMessage)
    +    eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
    +      assert(sendMessage.equals(receiveMessage(0)))
    +    }
         ssc.stop()
       }
    +
    +  private def setupMQTT() {
    +    broker = new BrokerService()
    +    connector = new TransportConnector()
    +    connector.setName("mqtt")
    +    connector.setUri(new URI("mqtt:" + brokerUri))
    +    broker.addConnector(connector)
    +    broker.start()
    +  }
    +
    +  private def tearDownMQTT() {
    +    if (broker != null) {
    +      broker.stop()
    +      broker = null
    +    }
    +    if (connector != null) {
    +      connector.stop()
    +      connector = null
    +    }
    +  }
    +
    +  private def findFreePort(): Int = {
    +    Utils.startServiceOnPort(23456, (trialPort: Int) => {
    +      val socket = new ServerSocket(trialPort)
    +      socket.close()
    +      (null, trialPort)
    +    })._2
    +  }
    +
    +  def publishData(data: String): Unit = {
    +    var client: MqttClient = null
    +    try {
    +      val persistence: MqttClientPersistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath)
    +      client = new MqttClient("tcp:" + brokerUri, MqttClient.generateClientId(), persistence)
    +      client.connect()
    +      if (client.isConnected) {
    +        val msgTopic: MqttTopic = client.getTopic(topic)
    +        val message: MqttMessage = new MqttMessage(data.getBytes("utf-8"))
    +        message.setQos(1)
    +        message.setRetained(true)
    +        for (i <- 0 to 10)
    +          msgTopic.publish(message)
    +      }
    +    } catch {
    +      case e: MqttException => println("Exception Caught: " + e)
    --- End diff --
    
    Why can there be an exception? And if there is an exception, why is it being ignored? Printing and not doing anything is essentially ignoring if the unit test passes without successfully testing anything. If there is a slightly chance that this publishing may fail, please add sufficient retries to make it pass reliably.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org