You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2018/12/12 02:13:40 UTC

bahir git commit: [BAHIR-186] SSL support in MQTT structured streaming

Repository: bahir
Updated Branches:
  refs/heads/master 0601698c3 -> a73ab48a2


[BAHIR-186] SSL support in MQTT structured streaming

Closes #74


Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/a73ab48a
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/a73ab48a
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/a73ab48a

Branch: refs/heads/master
Commit: a73ab48a2dfec866b2ffa0ccf0d2bfeaba6fc782
Parents: 0601698
Author: Lukasz Antoniak <lu...@gmail.com>
Authored: Wed Dec 5 12:44:28 2018 -0800
Committer: Luciano Resende <lr...@apache.org>
Committed: Tue Dec 11 23:13:13 2018 -0300

----------------------------------------------------------------------
 sql-streaming-mqtt/README.md                    |  38 ++++++++----
 .../bahir/sql/streaming/mqtt/MQTTUtils.scala    |  26 ++++++++
 .../src/test/resources/keystore.jks             | Bin 0 -> 2247 bytes
 .../src/test/resources/truststore.jks           | Bin 0 -> 956 bytes
 .../streaming/mqtt/MQTTStreamSinkSuite.scala    |  33 +++++++---
 .../sql/streaming/mqtt/MQTTTestUtils.scala      |  60 +++++++++++++++----
 6 files changed, 125 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir/blob/a73ab48a/sql-streaming-mqtt/README.md
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/README.md b/sql-streaming-mqtt/README.md
index a426a11..721b544 100644
--- a/sql-streaming-mqtt/README.md
+++ b/sql-streaming-mqtt/README.md
@@ -57,19 +57,31 @@ Setting values for option `localStorage` and `clientId` helps in recovering in c
 
 This connector uses [Eclipse Paho Java Client](https://eclipse.org/paho/clients/java/). Client API documentation is located [here](http://www.eclipse.org/paho/files/javadoc/index.html).
 
- * `brokerUrl` An URL MqttClient connects to. Set this or `path` as the URL of the Mqtt Server. e.g. tcp://localhost:1883.
- * `persistence` By default it is used for storing incoming messages on disk. If `memory` is provided as value for this option, then recovery on restart is not supported.
- * `topic` Topic MqttClient subscribes to.
- * `clientId` clientId, this client is associated with. Provide the same value to recover a stopped source client. MQTT sink ignores client identifier, because Spark batch can be distributed across multiple workers whereas MQTT broker does not allow simultanous connections with same ID from multiple hosts.
- * `QoS` The maximum quality of service to subscribe each topic at. Messages published at a lower quality of service will be received at the published QoS. Messages published at a higher quality of service will be received using the QoS specified on the subscribe.
- * `username` Sets the user name to use for the connection to Mqtt Server. Do not set it, if server does not need this. Setting it empty will lead to errors.
- * `password` Sets the password to use for the connection.
- * `cleanSession` Setting it true starts a clean session, removes all checkpointed messages by a previous run of this source. This is set to false by default.
- * `connectionTimeout` Sets the connection timeout, a value of 0 is interpretted as wait until client connects. See `MqttConnectOptions.setConnectionTimeout` for more information.
- * `keepAlive` Same as `MqttConnectOptions.setKeepAliveInterval`.
- * `mqttVersion` Same as `MqttConnectOptions.setMqttVersion`.
- * `maxInflight` Same as `MqttConnectOptions.setMaxInflight`
- * `autoReconnect` Same as `MqttConnectOptions.setAutomaticReconnect`
+| Parameter name             | Description                                                                                                                                                                                                                                                                                       | Eclipse Paho reference                                                   |
+|----------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------|
+| `brokerUrl`                | URL MQTT client connects to. Specify this parameter or _path_. Example: _tcp://localhost:1883_, _ssl://localhost:1883_.                                                                                                                                                                           |                                                                          |
+| `persistence`              | Defines how incoming messages are stored. If _memory_ is provided as value for this option, recovery on restart is not supported. Otherwise messages are stored on disk and parameter _localStorage_ may define target directory.                                                                 |                                                                          |
+| `topic`                    | Topic which client subscribes to.                                                                                                                                                                                                                                                                 |                                                                          |
+| `clientId`                 | Uniquely identifies client instance. Provide the same value to recover a stopped source client. MQTT sink ignores client identifier, because Spark batch can be distributed across multiple workers whereas MQTT broker does not allow simultaneous connections with same ID from multiple hosts. |                                                                          |
+| `QoS`                      | The maximum quality of service to subscribe each topic at. Messages published at a lower quality of service will be received at the published QoS. Messages published at a higher quality of service will be received using the QoS specified on the subscribe.                                   |                                                                          |
+| `username`                 | User name used to authenticate with MQTT server. Do not set it, if server does not require authentication. Leaving empty may lead to errors.                                                                                                                                                      | `MqttConnectOptions.setUserName`                                         |
+| `password`                 | User password.                                                                                                                                                                                                                                                                                    | `MqttConnectOptions.setPassword`                                         |
+| `cleanSession`             | Setting to _true_ starts a clean session, removes all check-pointed messages persisted during previous run. Defaults to `false`.                                                                                                                                                                  | `MqttConnectOptions.setCleanSession`                                     |
+| `connectionTimeout`        | Sets the connection timeout, a value of _0_ is interpreted as wait until client connects.                                                                                                                                                                                                         | `MqttConnectOptions.setConnectionTimeout`                                |
+| `keepAlive`                | Sets the "keep alive" interval in seconds.                                                                                                                                                                                                                                                        | `MqttConnectOptions.setKeepAliveInterval`                                |
+| `mqttVersion`              | Specify MQTT protocol version.                                                                                                                                                                                                                                                                    | `MqttConnectOptions.setMqttVersion`                                      |
+| `maxInflight`              | Sets the maximum inflight requests. Useful for high volume traffic.                                                                                                                                                                                                                               | `MqttConnectOptions.setMaxInflight`                                      |
+| `autoReconnect`            | Sets whether the client will automatically attempt to reconnect to the server upon connectivity disruption.                                                                                                                                                                                       | `MqttConnectOptions.setAutomaticReconnect`                               |
+| `ssl.protocol`             | SSL protocol. Example: _SSLv3_, _TLS_, _TLSv1_, _TLSv1.2_.                                                                                                                                                                                                                                        | `MqttConnectOptions.setSSLProperties`, `com.ibm.ssl.protocol`            |
+| `ssl.key.store`            | Absolute path to key store file.                                                                                                                                                                                                                                                                  | `MqttConnectOptions.setSSLProperties`, `com.ibm.ssl.keyStore`            |
+| `ssl.key.store.password`   | Key store password.                                                                                                                                                                                                                                                                               | `MqttConnectOptions.setSSLProperties`, `com.ibm.ssl.keyStorePassword`    |
+| `ssl.key.store.type`       | Key store type. Example: _JKS_, _JCEKS_, _PKCS12_.                                                                                                                                                                                                                                                | `MqttConnectOptions.setSSLProperties`, `com.ibm.ssl.keyStoreType`        |
+| `ssl.key.store.provider`   | Key store provider. Example: _IBMJCE_.                                                                                                                                                                                                                                                            | `MqttConnectOptions.setSSLProperties`, `com.ibm.ssl.keyStoreProvider`    |
+| `ssl.trust.store`          | Absolute path to trust store file.                                                                                                                                                                                                                                                                | `MqttConnectOptions.setSSLProperties`, `com.ibm.ssl.trustStore`          |
+| `ssl.trust.store.password` | Trust store password.                                                                                                                                                                                                                                                                             | `MqttConnectOptions.setSSLProperties`, `com.ibm.ssl.trustStorePassword`  |
+| `ssl.trust.store.type`     | Trust store type. Example: _JKS_, _JCEKS_, _PKCS12_.                                                                                                                                                                                                                                              | `MqttConnectOptions.setSSLProperties`, `com.ibm.ssl.trustStoreType`      |
+| `ssl.trust.store.provider` | Trust store provider. Example: _IBMJCEFIPS_.                                                                                                                                                                                                                                                      | `MqttConnectOptions.setSSLProperties`, `com.ibm.ssl.trustStoreProvider`  |
+| `ssl.ciphers`              | List of enabled cipher suites. Example: _SSL_RSA_WITH_AES_128_CBC_SHA_.                                                                                                                                                                                                                           | `MqttConnectOptions.setSSLProperties`, `com.ibm.ssl.enabledCipherSuites` |
 
 ## Environment variables
 

http://git-wip-us.apache.org/repos/asf/bahir/blob/a73ab48a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTUtils.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTUtils.scala b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTUtils.scala
index a615d28..f0a6f1a 100644
--- a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTUtils.scala
+++ b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTUtils.scala
@@ -17,6 +17,8 @@
 
 package org.apache.bahir.sql.streaming.mqtt
 
+import java.util.Properties
+
 import org.eclipse.paho.client.mqttv3.{MqttClient, MqttClientPersistence, MqttConnectOptions}
 import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence}
 
@@ -26,6 +28,23 @@ import org.apache.bahir.utils.Logging
 
 
 private[mqtt] object MQTTUtils extends Logging {
+  // Since data source configuration properties are case-insensitive,
+  // we have to introduce our own keys. Also, good for vendor independence.
+  private[mqtt] val sslParamMapping = Map(
+    "ssl.protocol" -> "com.ibm.ssl.protocol",
+    "ssl.key.store" -> "com.ibm.ssl.keyStore",
+    "ssl.key.store.password" -> "com.ibm.ssl.keyStorePassword",
+    "ssl.key.store.type" -> "com.ibm.ssl.keyStoreType",
+    "ssl.key.store.provider" -> "com.ibm.ssl.keyStoreProvider",
+    "ssl.trust.store" -> "com.ibm.ssl.trustStore",
+    "ssl.trust.store.password" -> "com.ibm.ssl.trustStorePassword",
+    "ssl.trust.store.type" -> "com.ibm.ssl.trustStoreType",
+    "ssl.trust.store.provider" -> "com.ibm.ssl.trustStoreProvider",
+    "ssl.ciphers" -> "com.ibm.ssl.enabledCipherSuites",
+    "ssl.key.manager" -> "com.ibm.ssl.keyManager",
+    "ssl.trust.manager" -> "com.ibm.ssl.trustManager"
+  )
+
   private[mqtt] def parseConfigParams(config: Map[String, String]):
       (String, String, String, MqttClientPersistence, MqttConnectOptions, Int) = {
     def e(s: String) = new IllegalArgumentException(s)
@@ -78,6 +97,13 @@ private[mqtt] object MQTTUtils extends Logging {
         mqttConnectOptions.setPassword(p.toCharArray)
       case _ =>
     }
+    val sslProperties = new Properties()
+    config.foreach(e => {
+      if (e._1.startsWith("ssl.")) {
+        sslProperties.setProperty(sslParamMapping(e._1), e._2)
+      }
+    })
+    mqttConnectOptions.setSSLProperties(sslProperties)
 
     (brokerUrl, clientId, topic, persistence, mqttConnectOptions, qos)
   }

http://git-wip-us.apache.org/repos/asf/bahir/blob/a73ab48a/sql-streaming-mqtt/src/test/resources/keystore.jks
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/test/resources/keystore.jks b/sql-streaming-mqtt/src/test/resources/keystore.jks
new file mode 100644
index 0000000..4f2200f
Binary files /dev/null and b/sql-streaming-mqtt/src/test/resources/keystore.jks differ

http://git-wip-us.apache.org/repos/asf/bahir/blob/a73ab48a/sql-streaming-mqtt/src/test/resources/truststore.jks
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/test/resources/truststore.jks b/sql-streaming-mqtt/src/test/resources/truststore.jks
new file mode 100644
index 0000000..80e6d8d
Binary files /dev/null and b/sql-streaming-mqtt/src/test/resources/truststore.jks differ

http://git-wip-us.apache.org/repos/asf/bahir/blob/a73ab48a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSinkSuite.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSinkSuite.scala b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSinkSuite.scala
index d72ba17..ecdd942 100644
--- a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSinkSuite.scala
+++ b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSinkSuite.scala
@@ -39,14 +39,16 @@ import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructT
 import org.apache.bahir.utils.FileHelper
 
 
-class MQTTStreamSinkSuite extends SparkFunSuite with SharedSparkContext with BeforeAndAfter {
+class MQTTStreamSinkSuite(_ssl: Boolean) extends SparkFunSuite
+    with SharedSparkContext with BeforeAndAfter {
   protected var mqttTestUtils: MQTTTestUtils = _
   protected val tempDir: File = new File(System.getProperty("java.io.tmpdir") + "/mqtt-test/")
   protected val messages = new mutable.HashMap[Int, String]
   protected var testClient: MqttClient = _
 
   before {
-    mqttTestUtils = new MQTTTestUtils(tempDir)
+    SparkEnv.get.conf.set("spark.mqtt.client.connect.attempts", "1")
+    mqttTestUtils = new MQTTTestUtils(tempDir, ssl = _ssl)
     mqttTestUtils.setup()
     tempDir.mkdirs()
     messages.clear()
@@ -70,17 +72,22 @@ class MQTTStreamSinkSuite extends SparkFunSuite with SharedSparkContext with Bef
   }
 
   protected def sendToMQTT(dataFrame: DataFrame): StreamingQuery = {
-    dataFrame.writeStream
+    val protocol = if (_ssl) "ssl" else "tcp"
+    val writer = dataFrame.writeStream
       .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSinkProvider")
       .option("topic", "test").option("localStorage", tempDir.getAbsolutePath)
       .option("clientId", "clientId").option("QoS", "2")
-      .start("tcp://" + mqttTestUtils.brokerUri)
+    if (_ssl) {
+      writer.option("ssl.trust.store", mqttTestUtils.clientTrustStore.getAbsolutePath)
+        .option("ssl.trust.store.type", "JKS")
+        .option("ssl.trust.store.password", mqttTestUtils.clientTrustStorePassword)
+    }
+    writer.start(protocol + "://" + mqttTestUtils.brokerUri)
   }
 }
 
-class BasicMQTTSinkSuite extends MQTTStreamSinkSuite {
+class BasicMQTTSinkSuite extends MQTTStreamSinkSuite(false) {
   test("broker down") {
-    SparkEnv.get.conf.set("spark.mqtt.client.connect.attempts", "1")
     SparkSession.setActiveSession(SparkSession.builder().getOrCreate())
     val provider = new MQTTStreamSinkProvider
     val parameters = Map(
@@ -138,7 +145,19 @@ class BasicMQTTSinkSuite extends MQTTStreamSinkSuite {
   }
 }
 
-class StressTestMQTTSink extends MQTTStreamSinkSuite {
+class MQTTSSLSinkSuite extends MQTTStreamSinkSuite(true) {
+  test("verify SSL connectivity") {
+    val msg1 = "Hello, World!"
+    val msg2 = "MQTT is a message queue."
+    val (_, dataFrame) = createContextAndDF(msg1, msg2)
+
+    sendToMQTT(dataFrame).awaitTermination(5000)
+
+    assert(Set(msg1, msg2).equals(messages.values.toSet))
+  }
+}
+
+class StressTestMQTTSink extends MQTTStreamSinkSuite(false) {
   // run with -Xmx1024m
   test("Send and receive messages of size 100MB.") {
     val freeMemory: Long = Runtime.getRuntime.freeMemory()

http://git-wip-us.apache.org/repos/asf/bahir/blob/a73ab48a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTTestUtils.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTTestUtils.scala b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTTestUtils.scala
index 893a145..b891c93 100644
--- a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTTestUtils.scala
+++ b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTTestUtils.scala
@@ -18,22 +18,30 @@
 package org.apache.bahir.sql.streaming.mqtt
 
 import java.io.File
+import java.io.FileInputStream
 import java.net.{ServerSocket, URI}
 import java.nio.charset.Charset
+import java.security.{KeyStore, SecureRandom}
+import java.util.Properties
+import javax.net.ssl.KeyManagerFactory
 
 import scala.collection.mutable
 
-import org.apache.activemq.broker.{BrokerService, TransportConnector}
+import org.apache.activemq.broker.{BrokerService, SslBrokerService, SslContext, TransportConnector}
 import org.eclipse.paho.client.mqttv3._
-import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence}
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
 
 import org.apache.bahir.utils.Logging
 
 
-class MQTTTestUtils(tempDir: File, port: Int = 0) extends Logging {
+class MQTTTestUtils(tempDir: File, port: Int = 0, ssl: Boolean = false) extends Logging {
 
   private val brokerHost = "127.0.0.1"
   private val brokerPort: Int = if (port == 0) findFreePort() else port
+  val serverKeyStore = new File("src/test/resources/keystore.jks")
+  val serverKeyStorePassword = "changeit"
+  val clientTrustStore = new File("src/test/resources/truststore.jks")
+  val clientTrustStorePassword = "changeit"
 
   private var broker: BrokerService = _
   private var connector: TransportConnector = _
@@ -50,11 +58,21 @@ class MQTTTestUtils(tempDir: File, port: Int = 0) extends Logging {
   }
 
   def setup(): Unit = {
-    broker = new BrokerService()
+    broker = if (ssl) new SslBrokerService() else new BrokerService()
     broker.setDataDirectoryFile(tempDir)
+    val protocol = if (ssl) "mqtt+ssl" else "mqtt"
+    if (ssl) {
+      val keyStore = KeyStore.getInstance("JKS")
+      keyStore.load(new FileInputStream(serverKeyStore), serverKeyStorePassword.toCharArray)
+      val keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
+      keyManagerFactory.init(keyStore, serverKeyStorePassword.toCharArray)
+      broker.setSslContext(
+        new SslContext(keyManagerFactory.getKeyManagers, null, new SecureRandom())
+      )
+    }
     connector = new TransportConnector()
     connector.setName("mqtt")
-    connector.setUri(new URI("mqtt://" + brokerUri))
+    connector.setUri(new URI(protocol + "://" + brokerUri))
     broker.addConnector(connector)
     broker.start()
   }
@@ -76,12 +94,10 @@ class MQTTTestUtils(tempDir: File, port: Int = 0) extends Logging {
   def publishData(topic: String, data: String, N: Int = 1): Unit = {
     var client: MqttClient = null
     try {
-      val persistence = new MemoryPersistence()
-      client = new MqttClient("tcp://" + brokerUri, MqttClient.generateClientId(), persistence)
-      client.connect()
+      client = connectToServer(new MemoryPersistence(), null)
       if (client.isConnected) {
         val msgTopic = client.getTopic(topic)
-        for (i <- 0 until N) {
+        for (_ <- 0 until N) {
           try {
             Thread.sleep(20)
             val message = new MqttMessage(data.getBytes())
@@ -107,7 +123,6 @@ class MQTTTestUtils(tempDir: File, port: Int = 0) extends Logging {
   }
 
   def subscribeData(topic: String, messages: mutable.Map[Int, String]): MqttClient = {
-    val client = new MqttClient("tcp://" + brokerUri, MqttClient.generateClientId(), null)
     val callback = new MqttCallbackExtended() {
       override def messageArrived(topic_ : String, message: MqttMessage): Unit = synchronized {
         messages.put(message.getId, new String(message.getPayload, Charset.defaultCharset()))
@@ -122,12 +137,33 @@ class MQTTTestUtils(tempDir: File, port: Int = 0) extends Logging {
       override def connectComplete(reconnect: Boolean, serverURI: String): Unit = {
       }
     }
-    client.setCallback(callback)
-    client.connect()
+    val client = connectToServer(null, callback)
     client.subscribe(topic)
     client
   }
 
+  def connectToServer(
+    persistence: MqttClientPersistence, callback: MqttCallbackExtended
+  ): MqttClient = {
+    val protocol = if (ssl) "ssl" else "tcp"
+    val client = new MqttClient(
+      protocol + "://" + brokerUri, MqttClient.generateClientId(), persistence
+    )
+    val connectOptions: MqttConnectOptions = new MqttConnectOptions()
+    if (ssl) {
+      val sslProperties = new Properties()
+      sslProperties.setProperty("com.ibm.ssl.trustStore", clientTrustStore.getAbsolutePath)
+      sslProperties.setProperty("com.ibm.ssl.trustStoreType", "JKS")
+      sslProperties.setProperty("com.ibm.ssl.trustStorePassword", clientTrustStorePassword)
+      connectOptions.setSSLProperties(sslProperties)
+    }
+    if (callback != null) {
+      client.setCallback(callback)
+    }
+    client.connect(connectOptions)
+    client
+  }
+
   def sleepUntil(predicate: => Boolean, timeout: Long): Unit = {
     val deadline = System.currentTimeMillis() + timeout
     while (System.currentTimeMillis() < deadline) {