You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2016/08/22 16:58:00 UTC
bahir git commit: [BAHIR-51] Add new configuration options to
MqttStreamSource.
Repository: bahir
Updated Branches:
refs/heads/master 1c0f4afcb -> a351549cf
[BAHIR-51] Add new configuration options to MqttStreamSource.
Add new configuration options to enable secured connections and
other quality of services.
Closes #22
Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/a351549c
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/a351549c
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/a351549c
Branch: refs/heads/master
Commit: a351549cf634adf5249862599166ef9ed9073725
Parents: 1c0f4af
Author: Prashant Sharma <pr...@in.ibm.com>
Authored: Thu Aug 11 15:56:20 2016 +0530
Committer: Luciano Resende <lr...@apache.org>
Committed: Mon Aug 22 09:58:02 2016 -0700
----------------------------------------------------------------------
sql-streaming-mqtt/README.md | 16 ++++++
.../sql/streaming/mqtt/MQTTStreamSource.scala | 60 +++++++++++++++++---
.../streaming/mqtt/MQTTStreamSourceSuite.scala | 2 +-
3 files changed, 69 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bahir/blob/a351549c/sql-streaming-mqtt/README.md
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/README.md b/sql-streaming-mqtt/README.md
index c1a078b..c9ea8cf 100644
--- a/sql-streaming-mqtt/README.md
+++ b/sql-streaming-mqtt/README.md
@@ -44,6 +44,22 @@ Setting values for option `localStorage` and `clientId` helps in recovering in c
.option("clientId", "some-client-id")
.load("tcp://localhost:1883")
+## Configuration options.
+
+This source uses [Eclipse Paho Java Client](https://eclipse.org/paho/clients/java/). Client API documentation is located [here](http://www.eclipse.org/paho/files/javadoc/index.html).
+
+ * `brokerUrl` A url MqttClient connects to. Set this or `path` as the url of the Mqtt Server. e.g. tcp://localhost:1883.
+ * `persistence` By default it is used for storing incoming messages on disk. If `memory` is provided as value for this option, then recovery on restart is not supported.
+ * `topic` Topic MqttClient subscribes to.
+ * `clientId` clientId, this client is assoicated with. Provide the same value to recover a stopped client.
+ * `QoS` The maximum quality of service to subscribe each topic at. Messages published at a lower quality of service will be received at the published QoS. Messages published at a higher quality of service will be received using the QoS specified on the subscribe.
+ * `username` Sets the user name to use for the connection to Mqtt Server. Do not set it, if server does not need this. Setting it empty will lead to errors.
+ * `password` Sets the password to use for the connection.
+ * `cleanSession` Setting it true starts a clean session, removes all checkpointed messages by a previous run of this source. This is set to false by default.
+ * `connectionTimeout` Sets the connection timeout, a value of 0 is interpretted as wait until client connects. See `MqttConnectOptions.setConnectionTimeout` for more information.
+ * `keepAlive` Same as `MqttConnectOptions.setKeepAliveInterval`.
+ * `mqttVersion` Same as `MqttConnectOptions.setMqttVersion`.
+
### Scala API
An example, for scala API to count words from incoming message stream.
http://git-wip-us.apache.org/repos/asf/bahir/blob/a351549c/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala
index 471886a..8857edb 100644
--- a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala
+++ b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala
@@ -17,6 +17,7 @@
package org.apache.bahir.sql.streaming.mqtt
+import java.nio.charset.Charset
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Calendar
@@ -44,9 +45,31 @@ object MQTTStreamConstants {
:: StructField("timestamp", TimestampType) :: Nil)
}
+/**
+ * A Text based mqtt stream source, it interprets the payload of each incoming message by converting
+ * the bytes to String using Charset.defaultCharset as charset. Each value is associated with a
+ * timestamp of arrival of the message on the source. It can be used to operate a window on the
+ * incoming stream.
+ *
+ * @param brokerUrl url MqttClient connects to.
+ * @param persistence an instance of MqttClientPersistence. By default it is used for storing
+ * incoming messages on disk. If memory is provided as option, then recovery on
+ * restart is not supported.
+ * @param topic topic MqttClient subscribes to.
+ * @param clientId clientId, this client is assoicated with. Provide the same value to recover
+ * a stopped client.
+ * @param messageParser parsing logic for processing incoming messages from Mqtt Server.
+ * @param sqlContext Spark provided, SqlContext.
+ * @param mqttConnectOptions an instance of MqttConnectOptions for this Source.
+ * @param qos the maximum quality of service to subscribe each topic at.Messages published at
+ * a lower quality of service will be received at the published QoS. Messages
+ * published at a higher quality of service will be received using the QoS specified
+ * on the subscribe.
+ */
class MQTTTextStreamSource(brokerUrl: String, persistence: MqttClientPersistence,
topic: String, clientId: String, messageParser: Array[Byte] => (String, Timestamp),
- sqlContext: SQLContext) extends Source with Logging {
+ sqlContext: SQLContext, mqttConnectOptions: MqttConnectOptions, qos: Int)
+ extends Source with Logging {
override def schema: StructType = MQTTStreamConstants.SCHEMA_DEFAULT
@@ -73,10 +96,6 @@ class MQTTTextStreamSource(brokerUrl: String, persistence: MqttClientPersistence
private def initialize(): Unit = {
client = new MqttClient(brokerUrl, clientId, persistence)
- val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions()
- mqttConnectOptions.setAutomaticReconnect(true)
- // This is required to support recovery. TODO: configurable ?
- mqttConnectOptions.setCleanSession(false)
val callback = new MqttCallbackExtended() {
@@ -101,7 +120,7 @@ class MQTTTextStreamSource(brokerUrl: String, persistence: MqttClientPersistence
}
client.setCallback(callback)
client.connect(mqttConnectOptions)
- client.subscribe(topic)
+ client.subscribe(topic, qos)
// It is not possible to initialize offset without `client.connect`
offset = fetchLastProcessedOffset()
initLock.countDown() // Release.
@@ -171,7 +190,8 @@ class MQTTStreamSourceProvider extends StreamSourceProvider with DataSourceRegis
}
}
- val messageParserWithTimeStamp = (x: Array[Byte]) => (new String(x), Timestamp.valueOf(
+ val messageParserWithTimeStamp = (x: Array[Byte]) =>
+ (new String(x, Charset.defaultCharset()), Timestamp.valueOf(
MQTTStreamConstants.DATE_FORMAT.format(Calendar.getInstance().getTime)))
// if default is subscribe everything, it leads to getting lot unwanted system messages.
@@ -183,8 +203,32 @@ class MQTTStreamSourceProvider extends StreamSourceProvider with DataSourceRegis
"\nRecovering from failure is not supported in such a case.")
MqttClient.generateClientId()})
+ val username: Option[String] = parameters.get("username")
+ val password: Option[String] = parameters.get("password")
+ val connectionTimeout: Int = parameters.getOrElse("connectionTimeout",
+ MqttConnectOptions.CONNECTION_TIMEOUT_DEFAULT.toString).toInt
+ val keepAlive: Int = parameters.getOrElse("keepAlive", MqttConnectOptions
+ .KEEP_ALIVE_INTERVAL_DEFAULT.toString).toInt
+ val mqttVersion: Int = parameters.getOrElse("mqttVersion", MqttConnectOptions
+ .MQTT_VERSION_DEFAULT.toString).toInt
+ val cleanSession: Boolean = parameters.getOrElse("cleanSession", "false").toBoolean
+ val qos: Int = parameters.getOrElse("QoS", "1").toInt
+
+ val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions()
+ mqttConnectOptions.setAutomaticReconnect(true)
+ mqttConnectOptions.setCleanSession(cleanSession)
+ mqttConnectOptions.setConnectionTimeout(connectionTimeout)
+ mqttConnectOptions.setKeepAliveInterval(keepAlive)
+ mqttConnectOptions.setMqttVersion(mqttVersion)
+ (username, password) match {
+ case (Some(u: String), Some(p: String)) =>
+ mqttConnectOptions.setUserName(u)
+ mqttConnectOptions.setPassword(p.toCharArray)
+ case _ =>
+ }
+
new MQTTTextStreamSource(brokerUrl, persistence, topic, clientId,
- messageParserWithTimeStamp, sqlContext)
+ messageParserWithTimeStamp, sqlContext, mqttConnectOptions, qos)
}
override def shortName(): String = "mqtt"
http://git-wip-us.apache.org/repos/asf/bahir/blob/a351549c/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala
index f6f5ff6..257bd0b 100644
--- a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala
+++ b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala
@@ -59,7 +59,7 @@ class MQTTStreamSourceSuite extends SparkFunSuite with SharedSparkContext with B
val dataFrame: DataFrame =
sqlContext.readStream.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("topic", "test").option("localStorage", dir).option("clientId", "clientId")
- .load("tcp://" + mqttTestUtils.brokerUri)
+ .option("QoS", "2").load("tcp://" + mqttTestUtils.brokerUri)
(sqlContext, dataFrame)
}