You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@bahir.apache.org by ScrapCodes <gi...@git.apache.org> on 2016/07/26 05:27:21 UTC

[GitHub] bahir pull request #13: Add SQL Streaming MQTT support.

GitHub user ScrapCodes opened a pull request:

    https://github.com/apache/bahir/pull/13

    Add SQL Streaming MQTT support.

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ScrapCodes/bahir mqtt-sql-support

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/bahir/pull/13.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #13
    
----
commit 4ef08eaf1bdd578d638f00c31d2c7556c4363237
Author: Prashant Sharma <pr...@in.ibm.com>
Date:   2016-07-26T05:25:55Z

    Add MQTT support

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir issue #13: [BAHIR-39] Add SQL Streaming MQTT support.

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on the issue:

    https://github.com/apache/bahir/pull/13
  
    @frreiss Can you please take another look ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir pull request #13: Add SQL Streaming MQTT support.

Posted by frreiss <gi...@git.apache.org>.
Github user frreiss commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/13#discussion_r72210666
  
    --- Diff: streaming-mqtt/sql/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStream.scala ---
    @@ -0,0 +1,143 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.bahir.sql.streaming.mqtt
    +
    +import java.sql.Timestamp
    +import java.text.SimpleDateFormat
    +import java.util.Calendar
    +import java.util.concurrent.atomic.AtomicLong
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import org.eclipse.paho.client.mqttv3._
    +import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence}
    +
    +import org.apache.spark.sql.{DataFrame, Encoder, SQLContext}
    +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source}
    +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
    +import org.apache.spark.sql.types.{TimestampType, StringType, StructField, StructType}
    +
    +
    +object MQTTStream {
    +
    +  val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    +
    +  val SCHEMA_DEFAULT = StructType(StructField("value", StringType)
    +    :: StructField("timestamp", TimestampType) :: Nil)
    +}
    +
    +class MQTTTextStream(brokerUrl: String, persistence: MqttClientPersistence,
    +    topic: String, messageParser: Array[Byte] => (String, Timestamp),
    +    sqlContext: SQLContext) extends Source with Logging {
    +
    +  private val offset = new AtomicLong(0)
    +  override def schema: StructType = MQTTStream.SCHEMA_DEFAULT
    +
    +  @GuardedBy("this")
    +  private var messages = new ArrayBuffer[(String, Timestamp)]
    +  initialize()
    +  private def initialize(): Unit = {
    +
    +    val client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence)
    +    val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions()
    +    mqttConnectOptions.setAutomaticReconnect(true)
    +    val callback = new MqttCallbackExtended() {
    +
    +      override def messageArrived(topic_ : String, message: MqttMessage): Unit = {
    +        offset.getAndIncrement()
    +        messages += messageParser(message.getPayload)
    --- End diff --
    
    I don't think the @GuardedBy annotation adds `synchronized` blocks automatically. Maybe you meant to make the `messages` field synchronized?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir issue #13: Add SQL Streaming MQTT support.

Posted by frreiss <gi...@git.apache.org>.
Github user frreiss commented on the issue:

    https://github.com/apache/bahir/pull/13
  
    I don't think a warning is going to be enough. As far as I can see, this code will exhaust the Java heap and crash the Spark executor processes every time it runs. And when those processes restart, they will not be able to replay any lost data, because there is no code to repopulate the messages buffer. Even if there was code to repopulate the buffer, filling the buffer up again would only result in exhausting the heap again. Can MQTT's client-side persistence be used to provide the ability to replay arbitrarily back in time?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir pull request #13: [BAHIR-39] Add SQL Streaming MQTT support.

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/13#discussion_r73461457
  
    --- Diff: sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala ---
    @@ -0,0 +1,191 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.bahir.sql.streaming.mqtt
    +
    +import java.sql.Timestamp
    +import java.text.SimpleDateFormat
    +import java.util.Calendar
    +import java.util.concurrent.CountDownLatch
    +
    +import scala.collection.concurrent.TrieMap
    +import scala.collection.mutable.ArrayBuffer
    +import scala.util.{Failure, Success, Try}
    +
    +import org.apache.bahir.utils.Logging
    +import org.eclipse.paho.client.mqttv3._
    +import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence}
    +
    +import org.apache.spark.sql.{DataFrame, SQLContext}
    +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source}
    +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +
    +
    +object MQTTStreamConstants {
    +
    +  val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    +
    +  val SCHEMA_DEFAULT = StructType(StructField("value", StringType)
    +    :: StructField("timestamp", TimestampType) :: Nil)
    +}
    +
    +class MQTTTextStreamSource(brokerUrl: String, persistence: MqttClientPersistence,
    +    topic: String, clientId: String, messageParser: Array[Byte] => (String, Timestamp),
    +    sqlContext: SQLContext) extends Source with Logging {
    +
    +  override def schema: StructType = MQTTStreamConstants.SCHEMA_DEFAULT
    +
    +  private val store = new LocalMessageStore(persistence, sqlContext.sparkContext.getConf)
    +
    +  private val messages = new TrieMap[Int, (String, Timestamp)]
    +
    +  private val initLock = new CountDownLatch(1)
    +
    +  private var offset = 0
    +
    +  private var client: MqttClient = _
    +
    +  private def fetchLastProcessedOffset(): Int = {
    +    Try(store.maxProcessedOffset) match {
    +      case Success(x) =>
    +        log.info(s"Recovering from last stored offset $x")
    +        x
    +      case Failure(e) => 0
    +    }
    +  }
    +
    +  initialize()
    +  private def initialize(): Unit = {
    +
    +    client = new MqttClient(brokerUrl, clientId, persistence)
    +    val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions()
    +    mqttConnectOptions.setAutomaticReconnect(true)
    +    // This is required to support recovery. TODO: configurable ?
    +    mqttConnectOptions.setCleanSession(false)
    +
    +    val callback = new MqttCallbackExtended() {
    +
    +      override def messageArrived(topic_ : String, message: MqttMessage): Unit = synchronized {
    +        initLock.await() // Wait for initialization to complete.
    +        val temp = offset + 1
    +        messages.put(temp, messageParser(message.getPayload))
    +        offset = temp
    +        log.trace(s"Message arrived, $topic_ $message")
    +      }
    +
    +      override def deliveryComplete(token: IMqttDeliveryToken): Unit = {
    +      }
    +
    +      override def connectionLost(cause: Throwable): Unit = {
    +        log.warn("Connection to mqtt server lost.", cause)
    +      }
    +
    +      override def connectComplete(reconnect: Boolean, serverURI: String): Unit = {
    +        log.info(s"Connect complete $serverURI. Is it a reconnect?: $reconnect")
    +      }
    +    }
    +    client.setCallback(callback)
    +    client.connect(mqttConnectOptions)
    +    client.subscribe(topic)
    +    // It is not possible to initialize offset without `client.connect`
    +    offset = fetchLastProcessedOffset()
    --- End diff --
    
    Hi Fred, offset is initialized here. Because it can only be fetched once we have connected the client. This also the reason, I am holding an `initlock`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir issue #13: Add SQL Streaming MQTT support.

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on the issue:

    https://github.com/apache/bahir/pull/13
  
    @frreiss you are right about that. Actually, sql streaming may invoke getBatch with practically any offset range at any point in time. So there is no definite strategy for deciding what is stale enough. This is also the approach used for socket as a source. 
    
    We include this situation in warning, when the receiver starts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir issue #13: [BAHIR-39] Add SQL Streaming MQTT support.

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on the issue:

    https://github.com/apache/bahir/pull/13
  
    Potential issues with this source,
    
    1) Not all MQTT options are exposed to end user. Not sure how useful they can be.
    
    2) Message parser is not pluggable, this limits as to what a user can do with our source.
    
    3)  Currently persistence layer used create a file per message it stores, this can lead to serious problems if number of messages go very large. Too many files in a directory is not supported by all File systems (since it uses local filesystem).
    
    4) I have not yet run a very long job with this source. And study the memory usage and so on.
    
    ....


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir pull request #13: [BAHIR-39] Add SQL Streaming MQTT support.

Posted by frreiss <gi...@git.apache.org>.
Github user frreiss commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/13#discussion_r73422375
  
    --- Diff: sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala ---
    @@ -0,0 +1,191 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.bahir.sql.streaming.mqtt
    +
    +import java.sql.Timestamp
    +import java.text.SimpleDateFormat
    +import java.util.Calendar
    +import java.util.concurrent.CountDownLatch
    +
    +import scala.collection.concurrent.TrieMap
    +import scala.collection.mutable.ArrayBuffer
    +import scala.util.{Failure, Success, Try}
    +
    +import org.apache.bahir.utils.Logging
    +import org.eclipse.paho.client.mqttv3._
    +import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence}
    +
    +import org.apache.spark.sql.{DataFrame, SQLContext}
    +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source}
    +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +
    +
    +object MQTTStreamConstants {
    +
    +  val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    +
    +  val SCHEMA_DEFAULT = StructType(StructField("value", StringType)
    +    :: StructField("timestamp", TimestampType) :: Nil)
    +}
    +
    +class MQTTTextStreamSource(brokerUrl: String, persistence: MqttClientPersistence,
    +    topic: String, clientId: String, messageParser: Array[Byte] => (String, Timestamp),
    +    sqlContext: SQLContext) extends Source with Logging {
    +
    +  override def schema: StructType = MQTTStreamConstants.SCHEMA_DEFAULT
    +
    +  private val store = new LocalMessageStore(persistence, sqlContext.sparkContext.getConf)
    +
    +  private val messages = new TrieMap[Int, (String, Timestamp)]
    +
    +  private val initLock = new CountDownLatch(1)
    +
    +  private var offset = 0
    +
    +  private var client: MqttClient = _
    +
    +  private def fetchLastProcessedOffset(): Int = {
    +    Try(store.maxProcessedOffset) match {
    +      case Success(x) =>
    +        log.info(s"Recovering from last stored offset $x")
    +        x
    +      case Failure(e) => 0
    +    }
    +  }
    +
    +  initialize()
    +  private def initialize(): Unit = {
    +
    +    client = new MqttClient(brokerUrl, clientId, persistence)
    +    val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions()
    +    mqttConnectOptions.setAutomaticReconnect(true)
    +    // This is required to support recovery. TODO: configurable ?
    +    mqttConnectOptions.setCleanSession(false)
    +
    +    val callback = new MqttCallbackExtended() {
    +
    +      override def messageArrived(topic_ : String, message: MqttMessage): Unit = synchronized {
    +        initLock.await() // Wait for initialization to complete.
    +        val temp = offset + 1
    +        messages.put(temp, messageParser(message.getPayload))
    +        offset = temp
    +        log.trace(s"Message arrived, $topic_ $message")
    +      }
    +
    +      override def deliveryComplete(token: IMqttDeliveryToken): Unit = {
    --- End diff --
    
    I feel like this callback probably needs to do something to ensure that there are no duplicate or out of order messages in the message buffer. What is the interaction between these two callbacks (messageArrived/deliveryComplete) and the different QoS levels in MQTT?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir pull request #13: [BAHIR-39] Add SQL Streaming MQTT support.

Posted by frreiss <gi...@git.apache.org>.
Github user frreiss commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/13#discussion_r73422074
  
    --- Diff: sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala ---
    @@ -0,0 +1,191 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.bahir.sql.streaming.mqtt
    +
    +import java.sql.Timestamp
    +import java.text.SimpleDateFormat
    +import java.util.Calendar
    +import java.util.concurrent.CountDownLatch
    +
    +import scala.collection.concurrent.TrieMap
    +import scala.collection.mutable.ArrayBuffer
    +import scala.util.{Failure, Success, Try}
    +
    +import org.apache.bahir.utils.Logging
    +import org.eclipse.paho.client.mqttv3._
    +import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence}
    +
    +import org.apache.spark.sql.{DataFrame, SQLContext}
    +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source}
    +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +
    +
    +object MQTTStreamConstants {
    +
    +  val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    +
    +  val SCHEMA_DEFAULT = StructType(StructField("value", StringType)
    +    :: StructField("timestamp", TimestampType) :: Nil)
    +}
    +
    +class MQTTTextStreamSource(brokerUrl: String, persistence: MqttClientPersistence,
    +    topic: String, clientId: String, messageParser: Array[Byte] => (String, Timestamp),
    +    sqlContext: SQLContext) extends Source with Logging {
    +
    +  override def schema: StructType = MQTTStreamConstants.SCHEMA_DEFAULT
    +
    +  private val store = new LocalMessageStore(persistence, sqlContext.sparkContext.getConf)
    +
    +  private val messages = new TrieMap[Int, (String, Timestamp)]
    +
    +  private val initLock = new CountDownLatch(1)
    +
    +  private var offset = 0
    +
    +  private var client: MqttClient = _
    +
    +  private def fetchLastProcessedOffset(): Int = {
    +    Try(store.maxProcessedOffset) match {
    +      case Success(x) =>
    +        log.info(s"Recovering from last stored offset $x")
    +        x
    +      case Failure(e) => 0
    +    }
    +  }
    +
    +  initialize()
    +  private def initialize(): Unit = {
    +
    +    client = new MqttClient(brokerUrl, clientId, persistence)
    +    val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions()
    +    mqttConnectOptions.setAutomaticReconnect(true)
    +    // This is required to support recovery. TODO: configurable ?
    +    mqttConnectOptions.setCleanSession(false)
    +
    +    val callback = new MqttCallbackExtended() {
    +
    +      override def messageArrived(topic_ : String, message: MqttMessage): Unit = synchronized {
    --- End diff --
    
    Does MQTT guarantee that this method will be called exactly once, even if the client or server crashes and is restarted?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir issue #13: [BAHIR-39] Add SQL Streaming MQTT support.

Posted by lresende <gi...@git.apache.org>.
Github user lresende commented on the issue:

    https://github.com/apache/bahir/pull/13
  
    @scrapcodes i have't review this, but wanted to check if you guys think we should hold the release to incorporate this extension or not.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir pull request #13: [BAHIR-39] Add SQL Streaming MQTT support.

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/13#discussion_r73461705
  
    --- Diff: sql-streaming-mqtt/src/main/scala/org/apache/bahir/utils/Logging.scala ---
    @@ -0,0 +1,25 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.bahir.utils
    +
    +import org.slf4j.LoggerFactory
    +
    +
    +trait Logging {
    --- End diff --
    
    Actually, that is now private[spark]. Other extensions, are under the package apache spark. so they can still access it. Do you think we should have it under Spark package ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir pull request #13: [BAHIR-39] Add SQL Streaming MQTT support.

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/13#discussion_r73649345
  
    --- Diff: sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala ---
    @@ -0,0 +1,191 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.bahir.sql.streaming.mqtt
    +
    +import java.sql.Timestamp
    +import java.text.SimpleDateFormat
    +import java.util.Calendar
    +import java.util.concurrent.CountDownLatch
    +
    +import scala.collection.concurrent.TrieMap
    +import scala.collection.mutable.ArrayBuffer
    +import scala.util.{Failure, Success, Try}
    +
    +import org.apache.bahir.utils.Logging
    +import org.eclipse.paho.client.mqttv3._
    +import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence}
    +
    +import org.apache.spark.sql.{DataFrame, SQLContext}
    +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source}
    +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +
    +
    +object MQTTStreamConstants {
    +
    +  val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    +
    +  val SCHEMA_DEFAULT = StructType(StructField("value", StringType)
    +    :: StructField("timestamp", TimestampType) :: Nil)
    +}
    +
    +class MQTTTextStreamSource(brokerUrl: String, persistence: MqttClientPersistence,
    +    topic: String, clientId: String, messageParser: Array[Byte] => (String, Timestamp),
    +    sqlContext: SQLContext) extends Source with Logging {
    +
    +  override def schema: StructType = MQTTStreamConstants.SCHEMA_DEFAULT
    +
    +  private val store = new LocalMessageStore(persistence, sqlContext.sparkContext.getConf)
    +
    +  private val messages = new TrieMap[Int, (String, Timestamp)]
    +
    +  private val initLock = new CountDownLatch(1)
    +
    +  private var offset = 0
    +
    +  private var client: MqttClient = _
    +
    +  private def fetchLastProcessedOffset(): Int = {
    +    Try(store.maxProcessedOffset) match {
    +      case Success(x) =>
    +        log.info(s"Recovering from last stored offset $x")
    +        x
    +      case Failure(e) => 0
    +    }
    +  }
    +
    +  initialize()
    +  private def initialize(): Unit = {
    +
    +    client = new MqttClient(brokerUrl, clientId, persistence)
    +    val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions()
    +    mqttConnectOptions.setAutomaticReconnect(true)
    +    // This is required to support recovery. TODO: configurable ?
    +    mqttConnectOptions.setCleanSession(false)
    +
    +    val callback = new MqttCallbackExtended() {
    +
    +      override def messageArrived(topic_ : String, message: MqttMessage): Unit = synchronized {
    +        initLock.await() // Wait for initialization to complete.
    +        val temp = offset + 1
    +        messages.put(temp, messageParser(message.getPayload))
    +        offset = temp
    +        log.trace(s"Message arrived, $topic_ $message")
    +      }
    +
    +      override def deliveryComplete(token: IMqttDeliveryToken): Unit = {
    --- End diff --
    
    This is never called in case of a receiver only client. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir issue #13: [BAHIR-39] Add SQL Streaming MQTT support.

Posted by frreiss <gi...@git.apache.org>.
Github user frreiss commented on the issue:

    https://github.com/apache/bahir/pull/13
  
    LGTM.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir issue #13: Add SQL Streaming MQTT support.

Posted by frreiss <gi...@git.apache.org>.
Github user frreiss commented on the issue:

    https://github.com/apache/bahir/pull/13
  
    I don't see any code in this diff to remove old messages from the in-memory buffer after those messages have been consumed. Is there something I'm missing?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir issue #13: Add SQL Streaming MQTT support.

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on the issue:

    https://github.com/apache/bahir/pull/13
  
    I think I have made a mistake by creating mqtt/dstream and mqtt/sql. I am going to change it to make sql streaming mqtt as top level module in the project.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir issue #13: Add SQL Streaming MQTT support.

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on the issue:

    https://github.com/apache/bahir/pull/13
  
    Yes, I think MQTT's client side persistence can be used. We can move any message that is consumed once by streaming to persistent store. But streaming may never request certain offsets, for example those were checkpointed earlier. We can may be have a configurable timeout as well. Since we store every message along with timestamp it should not be a problem.
    
    P.S. If the persistence is memory - it may also get exhausted eventually (since it stores every thing on heap in a hashtable).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir pull request #13: [BAHIR-39] Add SQL Streaming MQTT support.

Posted by lresende <gi...@git.apache.org>.
Github user lresende commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/13#discussion_r73404347
  
    --- Diff: sql-streaming-mqtt/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister ---
    @@ -0,0 +1 @@
    +org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider
    --- End diff --
    
    Please add ASF License header.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir issue #13: [BAHIR-39] Add SQL Streaming MQTT support.

Posted by lresende <gi...@git.apache.org>.
Github user lresende commented on the issue:

    https://github.com/apache/bahir/pull/13
  
    @ScrapCodes I added some comments, they should be easy to address. Also, I am ok if not all issues you mentioned above are finished before we commit this code, but I would appreciate if you could raise jiras to make sure we address them in the future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir pull request #13: Add SQL Streaming MQTT support.

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/13#discussion_r72196577
  
    --- Diff: streaming-mqtt/sql/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStream.scala ---
    @@ -0,0 +1,143 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.bahir.sql.streaming.mqtt
    +
    +import java.sql.Timestamp
    +import java.text.SimpleDateFormat
    +import java.util.Calendar
    +import java.util.concurrent.atomic.AtomicLong
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import org.eclipse.paho.client.mqttv3._
    +import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence}
    +
    +import org.apache.spark.sql.{DataFrame, Encoder, SQLContext}
    +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source}
    +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
    +import org.apache.spark.sql.types.{TimestampType, StringType, StructField, StructType}
    +
    +
    +object MQTTStream {
    +
    +  val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    +
    +  val SCHEMA_DEFAULT = StructType(StructField("value", StringType)
    +    :: StructField("timestamp", TimestampType) :: Nil)
    +}
    +
    +class MQTTTextStream(brokerUrl: String, persistence: MqttClientPersistence,
    +    topic: String, messageParser: Array[Byte] => (String, Timestamp),
    +    sqlContext: SQLContext) extends Source with Logging {
    +
    +  private val offset = new AtomicLong(0)
    +  override def schema: StructType = MQTTStream.SCHEMA_DEFAULT
    +
    +  @GuardedBy("this")
    +  private var messages = new ArrayBuffer[(String, Timestamp)]
    +  initialize()
    +  private def initialize(): Unit = {
    +
    +    val client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence)
    +    val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions()
    +    mqttConnectOptions.setAutomaticReconnect(true)
    +    val callback = new MqttCallbackExtended() {
    +
    +      override def messageArrived(topic_ : String, message: MqttMessage): Unit = {
    +        offset.getAndIncrement()
    +        messages += messageParser(message.getPayload)
    --- End diff --
    
    messages is @GuardedBy("this")


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir pull request #13: Add SQL Streaming MQTT support.

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/13#discussion_r72215214
  
    --- Diff: streaming-mqtt/sql/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStream.scala ---
    @@ -0,0 +1,143 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.bahir.sql.streaming.mqtt
    +
    +import java.sql.Timestamp
    +import java.text.SimpleDateFormat
    +import java.util.Calendar
    +import java.util.concurrent.atomic.AtomicLong
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import org.eclipse.paho.client.mqttv3._
    +import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence}
    +
    +import org.apache.spark.sql.{DataFrame, Encoder, SQLContext}
    +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source}
    +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
    +import org.apache.spark.sql.types.{TimestampType, StringType, StructField, StructType}
    +
    +
    +object MQTTStream {
    +
    +  val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    +
    +  val SCHEMA_DEFAULT = StructType(StructField("value", StringType)
    +    :: StructField("timestamp", TimestampType) :: Nil)
    +}
    +
    +class MQTTTextStream(brokerUrl: String, persistence: MqttClientPersistence,
    +    topic: String, messageParser: Array[Byte] => (String, Timestamp),
    +    sqlContext: SQLContext) extends Source with Logging {
    +
    +  private val offset = new AtomicLong(0)
    +  override def schema: StructType = MQTTStream.SCHEMA_DEFAULT
    +
    +  @GuardedBy("this")
    +  private var messages = new ArrayBuffer[(String, Timestamp)]
    +  initialize()
    +  private def initialize(): Unit = {
    +
    +    val client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence)
    +    val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions()
    +    mqttConnectOptions.setAutomaticReconnect(true)
    +    val callback = new MqttCallbackExtended() {
    +
    +      override def messageArrived(topic_ : String, message: MqttMessage): Unit = {
    +        offset.getAndIncrement()
    +        messages += messageParser(message.getPayload)
    --- End diff --
    
    hm.. actually I assumed by making access to the fields synchronized is safe enough. But then, what if offset is incremented and message is still not appended. Call to the get batch can cause failures.
    
    I am going to surround these with synchronized.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir pull request #13: [BAHIR-39] Add SQL Streaming MQTT support.

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/13#discussion_r73649399
  
    --- Diff: sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala ---
    @@ -0,0 +1,191 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.bahir.sql.streaming.mqtt
    +
    +import java.sql.Timestamp
    +import java.text.SimpleDateFormat
    +import java.util.Calendar
    +import java.util.concurrent.CountDownLatch
    +
    +import scala.collection.concurrent.TrieMap
    +import scala.collection.mutable.ArrayBuffer
    +import scala.util.{Failure, Success, Try}
    +
    +import org.apache.bahir.utils.Logging
    +import org.eclipse.paho.client.mqttv3._
    +import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence}
    +
    +import org.apache.spark.sql.{DataFrame, SQLContext}
    +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source}
    +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +
    +
    +object MQTTStreamConstants {
    +
    +  val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    +
    +  val SCHEMA_DEFAULT = StructType(StructField("value", StringType)
    +    :: StructField("timestamp", TimestampType) :: Nil)
    +}
    +
    +class MQTTTextStreamSource(brokerUrl: String, persistence: MqttClientPersistence,
    +    topic: String, clientId: String, messageParser: Array[Byte] => (String, Timestamp),
    +    sqlContext: SQLContext) extends Source with Logging {
    +
    +  override def schema: StructType = MQTTStreamConstants.SCHEMA_DEFAULT
    +
    +  private val store = new LocalMessageStore(persistence, sqlContext.sparkContext.getConf)
    +
    +  private val messages = new TrieMap[Int, (String, Timestamp)]
    +
    +  private val initLock = new CountDownLatch(1)
    +
    +  private var offset = 0
    --- End diff --
    
    Actually we restore this from checkpoint during initialization.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir pull request #13: [BAHIR-39] Add SQL Streaming MQTT support.

Posted by lresende <gi...@git.apache.org>.
Github user lresende commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/13#discussion_r73407125
  
    --- Diff: sql-streaming-mqtt/src/main/scala/org/apache/bahir/utils/Logging.scala ---
    @@ -0,0 +1,25 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.bahir.utils
    +
    +import org.slf4j.LoggerFactory
    +
    +
    +trait Logging {
    --- End diff --
    
    Do we need our own definition of Logging ? Or is ok for us to use the one on Spark (which is what the other extensions are using)...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir pull request #13: [BAHIR-39] Add SQL Streaming MQTT support.

Posted by lresende <gi...@git.apache.org>.
Github user lresende commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/13#discussion_r73405744
  
    --- Diff: sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala ---
    @@ -0,0 +1,191 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.bahir.sql.streaming.mqtt
    +
    +import java.sql.Timestamp
    +import java.text.SimpleDateFormat
    +import java.util.Calendar
    +import java.util.concurrent.CountDownLatch
    +
    +import scala.collection.concurrent.TrieMap
    +import scala.collection.mutable.ArrayBuffer
    +import scala.util.{Failure, Success, Try}
    +
    +import org.apache.bahir.utils.Logging
    +import org.eclipse.paho.client.mqttv3._
    +import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence}
    +
    +import org.apache.spark.sql.{DataFrame, SQLContext}
    +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source}
    +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +
    +
    +object MQTTStreamConstants {
    +
    +  val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    +
    +  val SCHEMA_DEFAULT = StructType(StructField("value", StringType)
    +    :: StructField("timestamp", TimestampType) :: Nil)
    +}
    +
    +class MQTTTextStreamSource(brokerUrl: String, persistence: MqttClientPersistence,
    +    topic: String, clientId: String, messageParser: Array[Byte] => (String, Timestamp),
    +    sqlContext: SQLContext) extends Source with Logging {
    +
    +  override def schema: StructType = MQTTStreamConstants.SCHEMA_DEFAULT
    +
    +  private val store = new LocalMessageStore(persistence, sqlContext.sparkContext.getConf)
    +
    +  private val messages = new TrieMap[Int, (String, Timestamp)]
    +
    +  private val initLock = new CountDownLatch(1)
    +
    +  private var offset = 0
    +
    +  private var client: MqttClient = _
    +
    +  private def fetchLastProcessedOffset(): Int = {
    +    Try(store.maxProcessedOffset) match {
    +      case Success(x) =>
    +        log.info(s"Recovering from last stored offset $x")
    +        x
    +      case Failure(e) => 0
    +    }
    +  }
    +
    +  initialize()
    +  private def initialize(): Unit = {
    +
    +    client = new MqttClient(brokerUrl, clientId, persistence)
    +    val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions()
    +    mqttConnectOptions.setAutomaticReconnect(true)
    +    // This is required to support recovery. TODO: configurable ?
    +    mqttConnectOptions.setCleanSession(false)
    +
    +    val callback = new MqttCallbackExtended() {
    +
    +      override def messageArrived(topic_ : String, message: MqttMessage): Unit = synchronized {
    +        initLock.await() // Wait for initialization to complete.
    +        val temp = offset + 1
    +        messages.put(temp, messageParser(message.getPayload))
    +        offset = temp
    +        log.trace(s"Message arrived, $topic_ $message")
    +      }
    +
    +      override def deliveryComplete(token: IMqttDeliveryToken): Unit = {
    +      }
    +
    +      override def connectionLost(cause: Throwable): Unit = {
    +        log.warn("Connection to mqtt server lost.", cause)
    +      }
    +
    +      override def connectComplete(reconnect: Boolean, serverURI: String): Unit = {
    +        log.info(s"Connect complete $serverURI. Is it a reconnect?: $reconnect")
    +      }
    +    }
    +    client.setCallback(callback)
    +    client.connect(mqttConnectOptions)
    +    client.subscribe(topic)
    +    // It is not possible to initialize offset without `client.connect`
    +    offset = fetchLastProcessedOffset()
    +    initLock.countDown() // Release.
    +  }
    +
    +  /** Stop this source and free any resources it has allocated. */
    +  override def stop(): Unit = {
    +    client.disconnect()
    +    persistence.close()
    +    client.close()
    +  }
    +
    +  /** Returns the maximum available offset for this source. */
    +  override def getOffset: Option[Offset] = {
    +    if (offset == 0) {
    +      None
    +    } else {
    +      Some(LongOffset(offset))
    +    }
    +  }
    +
    +  /**
    +   * Returns the data that is between the offsets (`start`, `end`]. When `start` is `None` then
    +   * the batch should begin with the first available record. This method must always return the
    +   * same data for a particular `start` and `end` pair.
    +   */
    +  override def getBatch(start: Option[Offset], end: Offset): DataFrame = synchronized {
    +    val startIndex = start.getOrElse(LongOffset(0L)).asInstanceOf[LongOffset].offset.toInt
    +    val endIndex = end.asInstanceOf[LongOffset].offset.toInt
    +    val data: ArrayBuffer[(String, Timestamp)] = ArrayBuffer.empty
    +    // Move consumed messages to persistent store.
    +    (startIndex + 1 to endIndex).foreach { id =>
    +      val element: (String, Timestamp) = messages.getOrElse(id, store.retrieve(id))
    +      data += element
    +      store.store(id, element)
    +      messages.remove(id, element)
    +    }
    +    log.trace(s"Get Batch invoked, ${data.mkString}")
    +    import sqlContext.implicits._
    +    data.toDF("value", "timestamp")
    +  }
    +
    +}
    +
    +class MQTTStreamSourceProvider extends StreamSourceProvider with DataSourceRegister with Logging {
    +
    +  override def sourceSchema(sqlContext: SQLContext, schema: Option[StructType],
    +      providerName: String, parameters: Map[String, String]): (String, StructType) = {
    +    ("mqtt", MQTTStreamConstants.SCHEMA_DEFAULT)
    +  }
    +
    +  override def createSource(sqlContext: SQLContext, metadataPath: String,
    +      schema: Option[StructType], providerName: String, parameters: Map[String, String]): Source = {
    +
    +    def e(s: String) = new IllegalArgumentException(s)
    +
    +    val brokerUrl: String = parameters.getOrElse("brokerUrl", parameters.getOrElse("path",
    +      throw e("Please provide a `brokerUrl` by specifying path or .options(\"brokerUrl\",...)")))
    +
    +
    +    val persistence: MqttClientPersistence = parameters.get("persistence") match {
    +      case Some("memory") => new MemoryPersistence()
    +      case _ => val localStorage: Option[String] = parameters.get("localStorage")
    +        localStorage match {
    +          case Some(x) => new MqttDefaultFilePersistence(x)
    +          case None => new MqttDefaultFilePersistence()
    +        }
    +    }
    +
    +    val messageParserWithTimeStamp = (x: Array[Byte]) => (new String(x), Timestamp.valueOf(
    +      MQTTStreamConstants.DATE_FORMAT.format(Calendar.getInstance().getTime)))
    --- End diff --
    
    An option here would be to use something like Spark DateTimeUtils which handles usage of these classes in ThreadLocal to avoid concurrency issues


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir pull request #13: [BAHIR-39] Add SQL Streaming MQTT support.

Posted by lresende <gi...@git.apache.org>.
Github user lresende commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/13#discussion_r73406376
  
    --- Diff: sql-streaming-mqtt/README.md ---
    @@ -0,0 +1,121 @@
    +A library for reading data from MQTT Servers using Spark SQL Streaming ( or Structured streaming.). 
    +
    +## Linking
    +
    +Using SBT:
    +
    +```scala
    +libraryDependencies += "org.apache.bahir" %% "spark-sql-streaming-mqtt" % "2.0.0"
    +```
    +
    +Using Maven:
    +
    +```xml
    +<dependency>
    +    <groupId>org.apache.bahir</groupId>
    +    <artifactId>spark-sql-streaming-mqtt_2.11</artifactId>
    +    <version>2.0.0</version>
    +</dependency>
    +```
    +
    +This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option.
    +For example, to include it when starting the spark shell:
    +
    +```
    +$ bin/spark-shell --packages org.apache.bahir:spark-sql-streaming-mqtt_2.11:2.0.0
    +```
    +
    +Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
    +The `--packages` argument can also be used with `bin/spark-submit`.
    +
    +This library is compiled for Scala 2.11 only, and intends to support Spark 2.0 onwards.
    +
    +## Examples
    +
    +A SQL Stream can be created with data streams received through MQTT Server using,
    +
    +```scala
    +sqlContext.readStream
    +  .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
    +  .option("topic", "mytopic")
    +  .load("tcp://localhost:1883")
    +
    +```
    +
    +## Enable recovering from failures.
    +
    +Setting values for option `localStorage` and `clientId` helps in recovering in case of a restart, by restoring the state where it left off before the shutdown.
    +
    +```scala
    +sqlContext.readStream
    +  .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
    +  .option("topic", "mytopic")
    +  .option("localStorage", "/path/to/localdir")
    +  .option("clientId", "some-client-id")
    +  .load("tcp://localhost:1883")
    +
    +```
    +
    +### Scala API
    +
    +An example, for scala API to count words from incoming message stream. 
    +
    +```scala
    +    // Create DataFrame representing the stream of input lines from connection to mqtt server
    +    val lines = spark.readStream
    +      .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
    +      .option("topic", topic)
    +      .load(brokerUrl).as[(String, Timestamp)]
    +
    +    // Split the lines into words
    +    val words = lines.map(_._1).flatMap(_.split(" "))
    +
    +    // Generate running word count
    +    val wordCounts = words.groupBy("value").count()
    +
    +    // Start running the query that prints the running counts to the console
    +    val query = wordCounts.writeStream
    +      .outputMode("complete")
    +      .format("console")
    +      .start()
    +
    +    query.awaitTermination()
    +
    +```
    +Please see `MQTTStreamWordCount.scala` for full example.
    +
    +### Java API
    +
    +An example, for Java API to count words from incoming message stream. 
    +
    +```java
    +   
    +        // Create DataFrame representing the stream of input lines from connection to mqtt server.
    +        Dataset<String> lines = spark
    +                .readStream()
    +                .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
    +                .option("topic", topic)
    +                .load(brokerUrl).select("value").as(Encoders.STRING());
    +
    +        // Split the lines into words
    +        Dataset<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
    +            @Override
    +            public Iterator<String> call(String x) {
    +                return Arrays.asList(x.split(" ")).iterator();
    +            }
    +        }, Encoders.STRING());
    +
    +        // Generate running word count
    +        Dataset<Row> wordCounts = words.groupBy("value").count();
    +
    +        // Start running the query that prints the running counts to the console
    +        StreamingQuery query = wordCounts.writeStream()
    +                .outputMode("complete")
    +                .format("console")
    +                .start();
    +
    +        query.awaitTermination();
    +```
    +
    +Please see `JavaMQTTStreamWordCount.java` for full example.
    --- End diff --
    
    Please add a link to the example directory. Also, the example file name is MQTTStreamWordCount.java


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir pull request #13: Add SQL Streaming MQTT support.

Posted by frreiss <gi...@git.apache.org>.
Github user frreiss commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/13#discussion_r72194637
  
    --- Diff: streaming-mqtt/dstream/python/dstream.py ---
    @@ -0,0 +1,643 @@
    +#
    +# Licensed to the Apache Software Foundation (ASF) under one or more
    +# contributor license agreements.  See the NOTICE file distributed with
    +# this work for additional information regarding copyright ownership.
    +# The ASF licenses this file to You under the Apache License, Version 2.0
    +# (the "License"); you may not use this file except in compliance with
    +# the License.  You may obtain a copy of the License at
    +#
    +#    http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +#
    +
    +import sys
    +import operator
    +import time
    +from itertools import chain
    +from datetime import datetime
    +
    +if sys.version < "3":
    +    from itertools import imap as map, ifilter as filter
    +
    +from py4j.protocol import Py4JJavaError
    +
    +from pyspark import RDD
    +from pyspark.storagelevel import StorageLevel
    +from pyspark.streaming.util import rddToFileName, TransformFunction
    +from pyspark.rdd import portable_hash
    +from pyspark.resultiterable import ResultIterable
    +
    +__all__ = ["DStream"]
    +
    +
    +class DStream(object):
    --- End diff --
    
    Should this class be in Bahir?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir pull request #13: [BAHIR-39] Add SQL Streaming MQTT support.

Posted by lresende <gi...@git.apache.org>.
Github user lresende commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/13#discussion_r73406691
  
    --- Diff: sql-streaming-mqtt/src/main/scala/org/apache/bahir/utils/BahirUtils.scala ---
    @@ -0,0 +1,48 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.bahir.utils
    +
    +import java.io.{File, IOException}
    +import java.nio.file.{Files, FileVisitResult, Path, SimpleFileVisitor}
    +import java.nio.file.attribute.BasicFileAttributes
    +
    +object BahirUtils extends Logging {
    --- End diff --
    
    If this is used only for testing, should it go under tests ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir pull request #13: [BAHIR-39] Add SQL Streaming MQTT support.

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/13#discussion_r73461106
  
    --- Diff: sql-streaming-mqtt/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister ---
    @@ -0,0 +1 @@
    +org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider
    --- End diff --
    
    AFAIK, This particular file does not allow for comments. See [here](https://github.com/apache/spark/blob/master/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir pull request #13: [BAHIR-39] Add SQL Streaming MQTT support.

Posted by frreiss <gi...@git.apache.org>.
Github user frreiss commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/13#discussion_r73423700
  
    --- Diff: sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala ---
    @@ -0,0 +1,191 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.bahir.sql.streaming.mqtt
    +
    +import java.sql.Timestamp
    +import java.text.SimpleDateFormat
    +import java.util.Calendar
    +import java.util.concurrent.CountDownLatch
    +
    +import scala.collection.concurrent.TrieMap
    +import scala.collection.mutable.ArrayBuffer
    +import scala.util.{Failure, Success, Try}
    +
    +import org.apache.bahir.utils.Logging
    +import org.eclipse.paho.client.mqttv3._
    +import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence}
    +
    +import org.apache.spark.sql.{DataFrame, SQLContext}
    +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source}
    +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +
    +
    +object MQTTStreamConstants {
    +
    +  val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    +
    +  val SCHEMA_DEFAULT = StructType(StructField("value", StringType)
    +    :: StructField("timestamp", TimestampType) :: Nil)
    +}
    +
    +class MQTTTextStreamSource(brokerUrl: String, persistence: MqttClientPersistence,
    +    topic: String, clientId: String, messageParser: Array[Byte] => (String, Timestamp),
    +    sqlContext: SQLContext) extends Source with Logging {
    +
    +  override def schema: StructType = MQTTStreamConstants.SCHEMA_DEFAULT
    +
    +  private val store = new LocalMessageStore(persistence, sqlContext.sparkContext.getConf)
    +
    +  private val messages = new TrieMap[Int, (String, Timestamp)]
    +
    +  private val initLock = new CountDownLatch(1)
    +
    +  private var offset = 0
    --- End diff --
    
    I think you need to initialize this variable from something stored in a checkpoint. Otherwise you'll get a different answer at a given offset into the stream if the source dies and is restarted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir pull request #13: [BAHIR-39] Add SQL Streaming MQTT support.

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/13#discussion_r73461326
  
    --- Diff: sql-streaming-mqtt/README.md ---
    @@ -0,0 +1,121 @@
    +A library for reading data from MQTT Servers using Spark SQL Streaming ( or Structured streaming.). 
    +
    +## Linking
    +
    +Using SBT:
    +
    +```scala
    +libraryDependencies += "org.apache.bahir" %% "spark-sql-streaming-mqtt" % "2.0.0"
    +```
    +
    +Using Maven:
    +
    +```xml
    +<dependency>
    +    <groupId>org.apache.bahir</groupId>
    +    <artifactId>spark-sql-streaming-mqtt_2.11</artifactId>
    +    <version>2.0.0</version>
    +</dependency>
    +```
    +
    +This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option.
    +For example, to include it when starting the spark shell:
    +
    +```
    +$ bin/spark-shell --packages org.apache.bahir:spark-sql-streaming-mqtt_2.11:2.0.0
    +```
    +
    +Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
    +The `--packages` argument can also be used with `bin/spark-submit`.
    +
    +This library is compiled for Scala 2.11 only, and intends to support Spark 2.0 onwards.
    +
    +## Examples
    +
    +A SQL Stream can be created with data streams received through MQTT Server using,
    +
    +```scala
    +sqlContext.readStream
    +  .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
    +  .option("topic", "mytopic")
    +  .load("tcp://localhost:1883")
    +
    +```
    +
    +## Enable recovering from failures.
    +
    +Setting values for option `localStorage` and `clientId` helps in recovering in case of a restart, by restoring the state where it left off before the shutdown.
    +
    +```scala
    +sqlContext.readStream
    +  .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
    +  .option("topic", "mytopic")
    +  .option("localStorage", "/path/to/localdir")
    +  .option("clientId", "some-client-id")
    +  .load("tcp://localhost:1883")
    +
    +```
    +
    +### Scala API
    +
    +An example, for scala API to count words from incoming message stream. 
    +
    +```scala
    +    // Create DataFrame representing the stream of input lines from connection to mqtt server
    +    val lines = spark.readStream
    +      .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
    +      .option("topic", topic)
    +      .load(brokerUrl).as[(String, Timestamp)]
    +
    +    // Split the lines into words
    +    val words = lines.map(_._1).flatMap(_.split(" "))
    +
    +    // Generate running word count
    +    val wordCounts = words.groupBy("value").count()
    +
    +    // Start running the query that prints the running counts to the console
    +    val query = wordCounts.writeStream
    +      .outputMode("complete")
    +      .format("console")
    +      .start()
    +
    +    query.awaitTermination()
    +
    +```
    +Please see `MQTTStreamWordCount.scala` for full example.
    +
    +### Java API
    +
    +An example, for Java API to count words from incoming message stream. 
    +
    +```java
    +   
    +        // Create DataFrame representing the stream of input lines from connection to mqtt server.
    +        Dataset<String> lines = spark
    +                .readStream()
    +                .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
    +                .option("topic", topic)
    +                .load(brokerUrl).select("value").as(Encoders.STRING());
    +
    +        // Split the lines into words
    +        Dataset<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
    +            @Override
    +            public Iterator<String> call(String x) {
    +                return Arrays.asList(x.split(" ")).iterator();
    +            }
    +        }, Encoders.STRING());
    +
    +        // Generate running word count
    +        Dataset<Row> wordCounts = words.groupBy("value").count();
    +
    +        // Start running the query that prints the running counts to the console
    +        StreamingQuery query = wordCounts.writeStream()
    +                .outputMode("complete")
    +                .format("console")
    +                .start();
    +
    +        query.awaitTermination();
    +```
    +
    +Please see `JavaMQTTStreamWordCount.java` for full example.
    --- End diff --
    
    Hi, Thanks for looking, I am referring to [this](https://github.com/apache/bahir/pull/13/files/196174029ab62e1deab0fad93f9b5e655ebf3327#diff-c6776a5637dfc80566ea03ff40bda94e) file. I can update with the link to it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir pull request #13: [BAHIR-39] Add SQL Streaming MQTT support.

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/13#discussion_r73649853
  
    --- Diff: sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala ---
    @@ -0,0 +1,191 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.bahir.sql.streaming.mqtt
    +
    +import java.sql.Timestamp
    +import java.text.SimpleDateFormat
    +import java.util.Calendar
    +import java.util.concurrent.CountDownLatch
    +
    +import scala.collection.concurrent.TrieMap
    +import scala.collection.mutable.ArrayBuffer
    +import scala.util.{Failure, Success, Try}
    +
    +import org.apache.bahir.utils.Logging
    +import org.eclipse.paho.client.mqttv3._
    +import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence}
    +
    +import org.apache.spark.sql.{DataFrame, SQLContext}
    +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source}
    +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +
    +
    +object MQTTStreamConstants {
    +
    +  val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    +
    +  val SCHEMA_DEFAULT = StructType(StructField("value", StringType)
    +    :: StructField("timestamp", TimestampType) :: Nil)
    +}
    +
    +class MQTTTextStreamSource(brokerUrl: String, persistence: MqttClientPersistence,
    +    topic: String, clientId: String, messageParser: Array[Byte] => (String, Timestamp),
    +    sqlContext: SQLContext) extends Source with Logging {
    +
    +  override def schema: StructType = MQTTStreamConstants.SCHEMA_DEFAULT
    +
    +  private val store = new LocalMessageStore(persistence, sqlContext.sparkContext.getConf)
    +
    +  private val messages = new TrieMap[Int, (String, Timestamp)]
    +
    +  private val initLock = new CountDownLatch(1)
    +
    +  private var offset = 0
    +
    +  private var client: MqttClient = _
    +
    +  private def fetchLastProcessedOffset(): Int = {
    +    Try(store.maxProcessedOffset) match {
    +      case Success(x) =>
    +        log.info(s"Recovering from last stored offset $x")
    +        x
    +      case Failure(e) => 0
    +    }
    +  }
    +
    +  initialize()
    +  private def initialize(): Unit = {
    +
    +    client = new MqttClient(brokerUrl, clientId, persistence)
    +    val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions()
    +    mqttConnectOptions.setAutomaticReconnect(true)
    +    // This is required to support recovery. TODO: configurable ?
    +    mqttConnectOptions.setCleanSession(false)
    +
    +    val callback = new MqttCallbackExtended() {
    +
    +      override def messageArrived(topic_ : String, message: MqttMessage): Unit = synchronized {
    --- End diff --
    
    No, definitely not for all QOS levels. In case of a failure to return from this method, a message may be redelivered.
    In case of a sudden crash, we may anyway lose messages. What do you think should be our strategy here ?
    
    About message ordering,
     According to this [post](http://stackoverflow.com/questions/30955110/is-message-order-preserved), message ordering can be  ensured at the server end. Should we implement it at client end ? And do you think message id can be used to ensure ordering ?
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir issue #13: [BAHIR-39] Add SQL Streaming MQTT support.

Posted by lresende <gi...@git.apache.org>.
Github user lresende commented on the issue:

    https://github.com/apache/bahir/pull/13
  
    LGTM
    
    Merging if there is no more comments


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir issue #13: [BAHIR-39] Add SQL Streaming MQTT support.

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on the issue:

    https://github.com/apache/bahir/pull/13
  
    @lresende Can you please review this PR ?
    @mridulm  Can you please take a look now and see if the concurrency issue that you sensed earlier exists ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir pull request #13: [BAHIR-39] Add SQL Streaming MQTT support.

Posted by lresende <gi...@git.apache.org>.
Github user lresende commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/13#discussion_r73405558
  
    --- Diff: sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala ---
    @@ -0,0 +1,191 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.bahir.sql.streaming.mqtt
    +
    +import java.sql.Timestamp
    +import java.text.SimpleDateFormat
    +import java.util.Calendar
    +import java.util.concurrent.CountDownLatch
    +
    +import scala.collection.concurrent.TrieMap
    +import scala.collection.mutable.ArrayBuffer
    +import scala.util.{Failure, Success, Try}
    +
    +import org.apache.bahir.utils.Logging
    +import org.eclipse.paho.client.mqttv3._
    +import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence}
    +
    +import org.apache.spark.sql.{DataFrame, SQLContext}
    +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source}
    +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +
    +
    +object MQTTStreamConstants {
    +
    +  val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    --- End diff --
    
    Not thread safe, see comments below on calendar usage...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir issue #13: [BAHIR-39] Add SQL Streaming MQTT support.

Posted by lresende <gi...@git.apache.org>.
Github user lresende commented on the issue:

    https://github.com/apache/bahir/pull/13
  
    Based on the discussion at [spark-dev](https://www.mail-archive.com/dev@spark.apache.org/msg15209.html) it seems that some of the issues raised by @frreiss are current limitations of Spark Structured Streaming. Based on that, I think we are good to merge this, and address enhancements with new jira/prs and we might need to coordinate with Spark runtime changes required...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir issue #13: [BAHIR-39] Add SQL Streaming MQTT support.

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on the issue:

    https://github.com/apache/bahir/pull/13
  
    @lresende If release can hold a bit, that would be good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir pull request #13: Add SQL Streaming MQTT support.

Posted by frreiss <gi...@git.apache.org>.
Github user frreiss commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/13#discussion_r72195316
  
    --- Diff: streaming-mqtt/sql/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStream.scala ---
    @@ -0,0 +1,143 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.bahir.sql.streaming.mqtt
    +
    +import java.sql.Timestamp
    +import java.text.SimpleDateFormat
    +import java.util.Calendar
    +import java.util.concurrent.atomic.AtomicLong
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import org.eclipse.paho.client.mqttv3._
    +import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence}
    +
    +import org.apache.spark.sql.{DataFrame, Encoder, SQLContext}
    +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source}
    +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
    +import org.apache.spark.sql.types.{TimestampType, StringType, StructField, StructType}
    +
    +
    +object MQTTStream {
    +
    +  val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    +
    +  val SCHEMA_DEFAULT = StructType(StructField("value", StringType)
    +    :: StructField("timestamp", TimestampType) :: Nil)
    +}
    +
    +class MQTTTextStream(brokerUrl: String, persistence: MqttClientPersistence,
    +    topic: String, messageParser: Array[Byte] => (String, Timestamp),
    +    sqlContext: SQLContext) extends Source with Logging {
    +
    +  private val offset = new AtomicLong(0)
    +  override def schema: StructType = MQTTStream.SCHEMA_DEFAULT
    +
    +  @GuardedBy("this")
    +  private var messages = new ArrayBuffer[(String, Timestamp)]
    +  initialize()
    +  private def initialize(): Unit = {
    +
    +    val client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence)
    +    val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions()
    +    mqttConnectOptions.setAutomaticReconnect(true)
    +    val callback = new MqttCallbackExtended() {
    +
    +      override def messageArrived(topic_ : String, message: MqttMessage): Unit = {
    +        offset.getAndIncrement()
    +        messages += messageParser(message.getPayload)
    --- End diff --
    
    Shouldn't there be some locking code here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir issue #13: [BAHIR-39] Add SQL Streaming MQTT support.

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on the issue:

    https://github.com/apache/bahir/pull/13
  
    Hi Fred and Luciano, I have gone through the discussion on spark-dev. In Spark 2.0, they need only upto one previous batch. Can we use this information to do cleanup, like maybe data older than two batches ? Then we can say that this connector is Spark 2.0 only, due to the limitation we have (JIRA id ).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir issue #13: Add SQL Streaming MQTT support.

Posted by ckadner <gi...@git.apache.org>.
Github user ckadner commented on the issue:

    https://github.com/apache/bahir/pull/13
  
    @ScrapCodes - Could you provide some background to this PR. I did not see any discussion on the [Bahir mailing lists](https://www.mail-archive.com/dev@bahir.apache.org/) or any [Bahir issues](https://issues.apache.org/jira/issues/?jql=project%20%3D%20BAHIR%20and%20%28summary%20~%20SQL%20or%20summary%20~%20sql%29) related to this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir issue #13: Add SQL Streaming MQTT support.

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on the issue:

    https://github.com/apache/bahir/pull/13
  
    I have created an issue to track this  BAHIR-39 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir pull request #13: Add SQL Streaming MQTT support.

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/13#discussion_r72379877
  
    --- Diff: sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStream.scala ---
    @@ -0,0 +1,141 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.bahir.sql.streaming.mqtt
    +
    +import java.sql.Timestamp
    +import java.text.SimpleDateFormat
    +import java.util.Calendar
    +import java.util.concurrent.atomic.AtomicLong
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import org.eclipse.paho.client.mqttv3._
    +import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence}
    +
    +import org.apache.spark.sql.{DataFrame, SQLContext}
    +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source}
    +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +
    +
    +object MQTTStream {
    +
    +  val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    +
    +  val SCHEMA_DEFAULT = StructType(StructField("value", StringType)
    +    :: StructField("timestamp", TimestampType) :: Nil)
    +}
    +
    +class MQTTTextStream(brokerUrl: String, persistence: MqttClientPersistence,
    +    topic: String, messageParser: Array[Byte] => (String, Timestamp),
    +    sqlContext: SQLContext) extends Source with Logging {
    +
    +  override def schema: StructType = MQTTStream.SCHEMA_DEFAULT
    +
    +  @GuardedBy("this")
    +  private var messages = new ArrayBuffer[(String, Timestamp)]
    +  initialize()
    +  private def initialize(): Unit = {
    +
    +    val client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence)
    +    val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions()
    +    mqttConnectOptions.setAutomaticReconnect(true)
    +    val callback = new MqttCallbackExtended() {
    +
    +      override def messageArrived(topic_ : String, message: MqttMessage): Unit = {
    +        messages += messageParser(message.getPayload)
    +        log.trace(s"Message arrived, $topic_ $message")
    +      }
    +
    +      override def deliveryComplete(token: IMqttDeliveryToken): Unit = {
    +      }
    +
    +      override def connectionLost(cause: Throwable): Unit = {
    +        log.warn("Connection to mqtt server lost.", cause)
    +      }
    +
    +      override def connectComplete(reconnect: Boolean, serverURI: String): Unit = {
    +        log.info(s"connect complete $serverURI. Is it a reconnect?: $reconnect")
    +      }
    +    }
    +    client.setCallback(callback)
    +    client.connect(mqttConnectOptions)
    +    client.subscribe(topic)
    +  }
    +
    +  /** Stop this source and free any resources it has allocated. */
    +  override def stop(): Unit = {
    +  }
    +
    +  /** Returns the maximum available offset for this source. */
    +  override def getOffset: Option[Offset] = {
    +    if (messages.isEmpty) {
    +      None
    +    } else {
    +      Some(LongOffset(messages.size))
    +    }
    --- End diff --
    
    GuardedBy here wont work, right ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir pull request #13: [BAHIR-39] Add SQL Streaming MQTT support.

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/bahir/pull/13


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---