You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2019/01/10 00:14:35 UTC

bahir git commit: [BAHIR-175] Fix MQTT recovery after checkpoint

Repository: bahir
Updated Branches:
  refs/heads/master a45bd8421 -> 5cfd7ac31


[BAHIR-175] Fix MQTT recovery after checkpoint

Closes #79


Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/5cfd7ac3
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/5cfd7ac3
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/5cfd7ac3

Branch: refs/heads/master
Commit: 5cfd7ac3154621b1780e2eb4719731030fc7d80a
Parents: a45bd84
Author: Lukasz Antoniak <lu...@gmail.com>
Authored: Wed Dec 19 13:23:58 2018 -0800
Committer: Luciano Resende <lr...@apache.org>
Committed: Wed Jan 9 16:14:00 2019 -0800

----------------------------------------------------------------------
 .../sql/streaming/mqtt/MQTTStreamSource.scala   |  9 ++++--
 .../bahir/sql/streaming/mqtt/MessageStore.scala |  9 +++++-
 .../streaming/mqtt/MQTTStreamSourceSuite.scala  | 34 +++++++++++++++++++-
 3 files changed, 47 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir/blob/5cfd7ac3/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala
index a40ff51..7146ecc 100644
--- a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala
+++ b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala
@@ -101,9 +101,9 @@ class MQTTStreamSource(options: DataSourceOptions, brokerUrl: String, persistenc
   /* Older than last N messages, will not be checked for redelivery. */
   val backLog = options.getInt("autopruning.backlog", 500)
 
-  private val store = new LocalMessageStore(persistence)
+  private[mqtt] val store = new LocalMessageStore(persistence)
 
-  private val messages = new TrieMap[Long, MQTTMessage]
+  private[mqtt] val messages = new TrieMap[Long, MQTTMessage]
 
   @GuardedBy("this")
   private var currentOffset: LongOffset = LongOffset(-1L)
@@ -125,6 +125,7 @@ class MQTTStreamSource(options: DataSourceOptions, brokerUrl: String, persistenc
         val mqttMessage = new MQTTMessage(message, topic_)
         val offset = currentOffset.offset + 1L
         messages.put(offset, mqttMessage)
+        store.store(offset, mqttMessage)
         currentOffset = LongOffset(offset)
         log.trace(s"Message arrived, $topic_ $mqttMessage")
       }
@@ -172,7 +173,8 @@ class MQTTStreamSource(options: DataSourceOptions, brokerUrl: String, persistenc
     val rawList: IndexedSeq[MQTTMessage] = synchronized {
       val sliceStart = LongOffset.convert(startOffset).get.offset + 1
       val sliceEnd = LongOffset.convert(endOffset).get.offset + 1
-      for (i <- sliceStart until sliceEnd) yield messages(i)
+      for (i <- sliceStart until sliceEnd) yield
+        messages.getOrElse(i, store.retrieve[MQTTMessage](i))
     }
     val spark = SparkSession.getActiveSession.get
     val numPartitions = spark.sparkContext.defaultParallelism
@@ -218,6 +220,7 @@ class MQTTStreamSource(options: DataSourceOptions, brokerUrl: String, persistenc
 
     (lastOffsetCommitted.offset until newOffset.offset).foreach { x =>
       messages.remove(x + 1)
+      store.remove(x + 1)
     }
     lastOffsetCommitted = newOffset
   }

http://git-wip-us.apache.org/repos/asf/bahir/blob/5cfd7ac3/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MessageStore.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MessageStore.scala b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MessageStore.scala
index d7d2657..30ec7a6 100644
--- a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MessageStore.scala
+++ b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MessageStore.scala
@@ -39,6 +39,9 @@ trait MessageStore {
   /** Highest offset we have stored */
   def maxProcessedOffset: Long
 
+  /** Remove message corresponding to a given id. */
+  def remove[T](id: Long): Unit
+
 }
 
 private[mqtt] class MqttPersistableData(bytes: Array[Byte]) extends MqttPersistable {
@@ -118,7 +121,7 @@ private[mqtt] class LocalMessageStore(val persistentStore: MqttClientPersistence
 
   import scala.collection.JavaConverters._
 
-  def maxProcessedOffset: Long = {
+  override def maxProcessedOffset: Long = {
     val keys: util.Enumeration[_] = persistentStore.keys()
     keys.asScala.map(x => x.toString.toInt).max
   }
@@ -140,4 +143,8 @@ private[mqtt] class LocalMessageStore(val persistentStore: MqttClientPersistence
     serializer.deserialize(get(id))
   }
 
+  override def remove[T](id: Long): Unit = {
+    persistentStore.remove(id.toString)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/bahir/blob/5cfd7ac3/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala
index a7eb770..c4e340c 100644
--- a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala
+++ b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala
@@ -23,8 +23,13 @@ import java.util.Optional
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions
 import org.eclipse.paho.client.mqttv3.MqttException
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
 import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.Eventually
+import org.scalatest.time
+import org.scalatest.time.Span
 
 import org.apache.spark.{SharedSparkContext, SparkFunSuite}
 import org.apache.spark.sql._
@@ -33,7 +38,8 @@ import org.apache.spark.sql.streaming.{DataStreamReader, StreamingQuery}
 
 import org.apache.bahir.utils.FileHelper
 
-class MQTTStreamSourceSuite extends SparkFunSuite with SharedSparkContext with BeforeAndAfter {
+class MQTTStreamSourceSuite extends SparkFunSuite
+    with Eventually with SharedSparkContext with BeforeAndAfter {
 
   protected var mqttTestUtils: MQTTTestUtils = _
   protected val tempDir: File = new File(System.getProperty("java.io.tmpdir") + "/mqtt-test/")
@@ -136,6 +142,32 @@ class BasicMQTTSourceSuite extends MQTTStreamSourceSuite {
     assert(resultBuffer.head == sendMessage)
   }
 
+  test("messages persisted in store") {
+    val sqlContext: SQLContext = SparkSession.builder().getOrCreate().sqlContext
+    val source = new MQTTStreamSource(
+      DataSourceOptions.empty(), "tcp://" + mqttTestUtils.brokerUri, new MemoryPersistence(),
+      "test", "clientId", new MqttConnectOptions(), 2
+    )
+    val payload = "MQTT is a message queue."
+    mqttTestUtils.publishData("test", payload)
+    eventually(timeout(Span(5, time.Seconds)), interval(Span(500, time.Millis))) {
+      val message = source.store.retrieve(0).asInstanceOf[Object]
+      assert(message != null)
+    }
+    // Clear in-memory cache to simulate recovery.
+    source.messages.clear()
+    source.setOffsetRange(Optional.empty(), Optional.empty())
+    var message: Row = null
+    for (f <- source.createDataReaderFactories().asScala) {
+      val dataReader = f.createDataReader()
+      if (dataReader.next()) {
+        message = dataReader.get()
+      }
+    }
+    source.commit(source.getCurrentOffset)
+    assert(payload == new String(message.getAs[Array[Byte]](2), "UTF-8"))
+  }
+
   test("no server up") {
     val provider = new MQTTStreamSourceProvider
     val sqlContext: SQLContext = SparkSession.builder().getOrCreate().sqlContext