You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2016/08/22 16:58:00 UTC

bahir git commit: [BAHIR-51] Add new configuration options to MqttStreamSource.

Repository: bahir
Updated Branches:
  refs/heads/master 1c0f4afcb -> a351549cf


[BAHIR-51] Add new configuration options to MqttStreamSource.

Add new configuration options to enable secured connections and
other quality of services.

Closes #22


Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/a351549c
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/a351549c
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/a351549c

Branch: refs/heads/master
Commit: a351549cf634adf5249862599166ef9ed9073725
Parents: 1c0f4af
Author: Prashant Sharma <pr...@in.ibm.com>
Authored: Thu Aug 11 15:56:20 2016 +0530
Committer: Luciano Resende <lr...@apache.org>
Committed: Mon Aug 22 09:58:02 2016 -0700

----------------------------------------------------------------------
 sql-streaming-mqtt/README.md                    | 16 ++++++
 .../sql/streaming/mqtt/MQTTStreamSource.scala   | 60 +++++++++++++++++---
 .../streaming/mqtt/MQTTStreamSourceSuite.scala  |  2 +-
 3 files changed, 69 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir/blob/a351549c/sql-streaming-mqtt/README.md
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/README.md b/sql-streaming-mqtt/README.md
index c1a078b..c9ea8cf 100644
--- a/sql-streaming-mqtt/README.md
+++ b/sql-streaming-mqtt/README.md
@@ -44,6 +44,22 @@ Setting values for option `localStorage` and `clientId` helps in recovering in c
         .option("clientId", "some-client-id")
         .load("tcp://localhost:1883")
 
+## Configuration options.
+
+This source uses [Eclipse Paho Java Client](https://eclipse.org/paho/clients/java/). Client API documentation is located [here](http://www.eclipse.org/paho/files/javadoc/index.html).
+
+ * `brokerUrl` A url MqttClient connects to. Set this or `path` as the url of the Mqtt Server. e.g. tcp://localhost:1883.
+ * `persistence` By default it is used for storing incoming messages on disk. If `memory` is provided as value for this option, then recovery on restart is not supported.
+ * `topic` Topic MqttClient subscribes to.
+ * `clientId` clientId, this client is assoicated with. Provide the same value to recover a stopped client.
+ * `QoS` The maximum quality of service to subscribe each topic at. Messages published at a lower quality of service will be received at the published QoS. Messages published at a higher quality of service will be received using the QoS specified on the subscribe.
+ * `username` Sets the user name to use for the connection to Mqtt Server. Do not set it, if server does not need this. Setting it empty will lead to errors.
+ * `password` Sets the password to use for the connection.
+ * `cleanSession` Setting it true starts a clean session, removes all checkpointed messages by a previous run of this source. This is set to false by default.
+ * `connectionTimeout` Sets the connection timeout, a value of 0 is interpretted as wait until client connects. See `MqttConnectOptions.setConnectionTimeout` for more information.
+ * `keepAlive` Same as `MqttConnectOptions.setKeepAliveInterval`.
+ * `mqttVersion` Same as `MqttConnectOptions.setMqttVersion`.
+
 ### Scala API
 
 An example, for scala API to count words from incoming message stream. 

http://git-wip-us.apache.org/repos/asf/bahir/blob/a351549c/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala
index 471886a..8857edb 100644
--- a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala
+++ b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala
@@ -17,6 +17,7 @@
 
 package org.apache.bahir.sql.streaming.mqtt
 
+import java.nio.charset.Charset
 import java.sql.Timestamp
 import java.text.SimpleDateFormat
 import java.util.Calendar
@@ -44,9 +45,31 @@ object MQTTStreamConstants {
     :: StructField("timestamp", TimestampType) :: Nil)
 }
 
+/**
+ * A Text based mqtt stream source, it interprets the payload of each incoming message by converting
+ * the bytes to String using Charset.defaultCharset as charset. Each value is associated with a
+ * timestamp of arrival of the message on the source. It can be used to operate a window on the
+ * incoming stream.
+ *
+ * @param brokerUrl url MqttClient connects to.
+ * @param persistence an instance of MqttClientPersistence. By default it is used for storing
+ *                    incoming messages on disk. If memory is provided as option, then recovery on
+ *                    restart is not supported.
+ * @param topic topic MqttClient subscribes to.
+ * @param clientId clientId, this client is assoicated with. Provide the same value to recover
+ *                 a stopped client.
+ * @param messageParser parsing logic for processing incoming messages from Mqtt Server.
+ * @param sqlContext Spark provided, SqlContext.
+ * @param mqttConnectOptions an instance of MqttConnectOptions for this Source.
+ * @param qos the maximum quality of service to subscribe each topic at.Messages published at
+ *            a lower quality of service will be received at the published QoS. Messages
+ *            published at a higher quality of service will be received using the QoS specified
+ *            on the subscribe.
+ */
 class MQTTTextStreamSource(brokerUrl: String, persistence: MqttClientPersistence,
     topic: String, clientId: String, messageParser: Array[Byte] => (String, Timestamp),
-    sqlContext: SQLContext) extends Source with Logging {
+    sqlContext: SQLContext, mqttConnectOptions: MqttConnectOptions, qos: Int)
+  extends Source with Logging {
 
   override def schema: StructType = MQTTStreamConstants.SCHEMA_DEFAULT
 
@@ -73,10 +96,6 @@ class MQTTTextStreamSource(brokerUrl: String, persistence: MqttClientPersistence
   private def initialize(): Unit = {
 
     client = new MqttClient(brokerUrl, clientId, persistence)
-    val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions()
-    mqttConnectOptions.setAutomaticReconnect(true)
-    // This is required to support recovery. TODO: configurable ?
-    mqttConnectOptions.setCleanSession(false)
 
     val callback = new MqttCallbackExtended() {
 
@@ -101,7 +120,7 @@ class MQTTTextStreamSource(brokerUrl: String, persistence: MqttClientPersistence
     }
     client.setCallback(callback)
     client.connect(mqttConnectOptions)
-    client.subscribe(topic)
+    client.subscribe(topic, qos)
     // It is not possible to initialize offset without `client.connect`
     offset = fetchLastProcessedOffset()
     initLock.countDown() // Release.
@@ -171,7 +190,8 @@ class MQTTStreamSourceProvider extends StreamSourceProvider with DataSourceRegis
         }
     }
 
-    val messageParserWithTimeStamp = (x: Array[Byte]) => (new String(x), Timestamp.valueOf(
+    val messageParserWithTimeStamp = (x: Array[Byte]) =>
+      (new String(x, Charset.defaultCharset()), Timestamp.valueOf(
       MQTTStreamConstants.DATE_FORMAT.format(Calendar.getInstance().getTime)))
 
     // if default is subscribe everything, it leads to getting lot unwanted system messages.
@@ -183,8 +203,32 @@ class MQTTStreamSourceProvider extends StreamSourceProvider with DataSourceRegis
         "\nRecovering from failure is not supported in such a case.")
       MqttClient.generateClientId()})
 
+    val username: Option[String] = parameters.get("username")
+    val password: Option[String] = parameters.get("password")
+    val connectionTimeout: Int = parameters.getOrElse("connectionTimeout",
+      MqttConnectOptions.CONNECTION_TIMEOUT_DEFAULT.toString).toInt
+    val keepAlive: Int = parameters.getOrElse("keepAlive", MqttConnectOptions
+      .KEEP_ALIVE_INTERVAL_DEFAULT.toString).toInt
+    val mqttVersion: Int = parameters.getOrElse("mqttVersion", MqttConnectOptions
+      .MQTT_VERSION_DEFAULT.toString).toInt
+    val cleanSession: Boolean = parameters.getOrElse("cleanSession", "false").toBoolean
+    val qos: Int = parameters.getOrElse("QoS", "1").toInt
+
+    val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions()
+    mqttConnectOptions.setAutomaticReconnect(true)
+    mqttConnectOptions.setCleanSession(cleanSession)
+    mqttConnectOptions.setConnectionTimeout(connectionTimeout)
+    mqttConnectOptions.setKeepAliveInterval(keepAlive)
+    mqttConnectOptions.setMqttVersion(mqttVersion)
+    (username, password) match {
+      case (Some(u: String), Some(p: String)) =>
+        mqttConnectOptions.setUserName(u)
+        mqttConnectOptions.setPassword(p.toCharArray)
+      case _ =>
+    }
+
     new MQTTTextStreamSource(brokerUrl, persistence, topic, clientId,
-      messageParserWithTimeStamp, sqlContext)
+      messageParserWithTimeStamp, sqlContext, mqttConnectOptions, qos)
   }
 
   override def shortName(): String = "mqtt"

http://git-wip-us.apache.org/repos/asf/bahir/blob/a351549c/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala
index f6f5ff6..257bd0b 100644
--- a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala
+++ b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala
@@ -59,7 +59,7 @@ class MQTTStreamSourceSuite extends SparkFunSuite with SharedSparkContext with B
     val dataFrame: DataFrame =
       sqlContext.readStream.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
         .option("topic", "test").option("localStorage", dir).option("clientId", "clientId")
-        .load("tcp://" + mqttTestUtils.brokerUri)
+        .option("QoS", "2").load("tcp://" + mqttTestUtils.brokerUri)
     (sqlContext, dataFrame)
   }