You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2018/11/08 03:38:03 UTC

bahir git commit: [BAHIR-166] Migrate akka sql streaming source to DataSource v2 API

Repository: bahir
Updated Branches:
  refs/heads/master b3902bac6 -> be1effaaf


[BAHIR-166] Migrate akka sql streaming source to DataSource v2 API

Migrate akka sql streaming source to DataSource v2 API.

Closes #67


Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/be1effaa
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/be1effaa
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/be1effaa

Branch: refs/heads/master
Commit: be1effaaf7cfde28d19e032e038694e01fbf169b
Parents: b3902ba
Author: shimamoto <sh...@apache.org>
Authored: Thu May 31 17:15:04 2018 +0900
Committer: Luciano Resende <lr...@apache.org>
Committed: Wed Nov 7 19:28:11 2018 -0800

----------------------------------------------------------------------
 pom.xml                                         |   8 +-
 sql-streaming-akka/README.md                    |   2 +-
 .../sql/streaming/akka/AkkaStreamSource.scala   | 183 ++++++++++++-------
 .../bahir/sql/streaming/akka/LongOffset.scala   |  54 ++++++
 .../bahir/sql/streaming/akka/MessageStore.scala |  18 +-
 .../streaming/akka/AkkaStreamSourceSuite.scala  |  13 +-
 .../sql/streaming/akka/AkkaTestUtils.scala      |   5 +-
 .../streaming/akka/ActorWordCount.scala         |   8 +-
 .../spark/streaming/akka/ActorReceiver.scala    |   5 +-
 .../spark/streaming/akka/AkkaStreamSuite.scala  |   4 +-
 .../streaming/zeromq/ZeroMQWordCount.scala      |   8 +-
 11 files changed, 214 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir/blob/be1effaa/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 8282346..13407bd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,7 +77,7 @@
   <modules>
     <module>sql-cloudant</module>
     <module>streaming-akka</module>
-    <!-- <module>sql-streaming-akka</module> Disabling akka sql module, until it is updated to run with datasource v2 API. -->
+    <module>sql-streaming-akka</module>
     <module>streaming-mqtt</module>
     <module>sql-streaming-mqtt</module>
     <module>streaming-twitter</module>
@@ -105,7 +105,7 @@
     <mqtt.paho.client>1.1.0</mqtt.paho.client>
     <!-- Streaming Akka connector -->
     <akka.group>com.typesafe.akka</akka.group>
-    <akka.version>2.4.20</akka.version>
+    <akka.version>2.5.12</akka.version>
     <akka_zeromq.version>2.3.16</akka_zeromq.version>
 
     <protobuf.version>2.5.0</protobuf.version>
@@ -569,7 +569,7 @@
               <include>**/*Suite.java</include>
             </includes>
             <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
-            <argLine>-ea -Xmx3g -Xss4m -XX:ReservedCodeCacheSize=${CodeCacheSize}</argLine>
+            <argLine>-Xmx3g -Xss4096k -XX:ReservedCodeCacheSize=${CodeCacheSize}</argLine>
             <environmentVariables>
               <!--
                 Setting SPARK_DIST_CLASSPATH is a simple way to make sure any child processes
@@ -1042,6 +1042,7 @@
       </modules>
     </profile>
 
+    <!--
     <profile>
       <id>scala-2.10</id>
       <activation>
@@ -1124,6 +1125,7 @@
         </plugins>
       </build>
     </profile>
+    -->
 
     <profile>
       <id>test-java-home</id>

http://git-wip-us.apache.org/repos/asf/bahir/blob/be1effaa/sql-streaming-akka/README.md
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/README.md b/sql-streaming-akka/README.md
index 3d9d17b..29685ee 100644
--- a/sql-streaming-akka/README.md
+++ b/sql-streaming-akka/README.md
@@ -45,7 +45,7 @@ Setting values for option `persistenceDirPath` helps in recovering in case of a
                        
 ## Configuration options.
                        
-This source uses [Akka Actor api](http://doc.akka.io/api/akka/2.4/akka/actor/Actor.html).
+This source uses [Akka Actor api](http://doc.akka.io/api/akka/2.5/akka/actor/Actor.html).
                        
 * `urlOfPublisher` The url of Publisher or Feeder actor that the Receiver actor connects to. Set this as the tcp url of the Publisher or Feeder actor.
 * `persistenceDirPath` By default it is used for storing incoming messages on disk.

http://git-wip-us.apache.org/repos/asf/bahir/blob/be1effaa/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala b/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala
index 96d892f..3f2101c 100644
--- a/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala
+++ b/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala
@@ -20,16 +20,18 @@ package org.apache.bahir.sql.streaming.akka
 import java.nio.ByteBuffer
 import java.sql.Timestamp
 import java.text.SimpleDateFormat
-import java.util.{Calendar, Objects}
+import java.util
+import java.util.{Calendar, Objects, Optional}
 import java.util.concurrent.CountDownLatch
 import java.util.concurrent.atomic.AtomicInteger
+import javax.annotation.concurrent.GuardedBy
 
+import scala.collection.JavaConverters._
 import scala.collection.concurrent.TrieMap
 import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{Await, Future}
 import scala.concurrent.duration._
-import scala.concurrent.Future
 import scala.language.postfixOps
-import scala.util.{Failure, Success, Try}
 
 import akka.actor._
 import akka.actor.SupervisorStrategy.{Escalate, Restart}
@@ -38,9 +40,12 @@ import akka.util.Timeout
 import com.typesafe.config.ConfigFactory
 import org.rocksdb.{Options, RocksDB}
 
-import org.apache.spark.sql.{DataFrame, SQLContext}
-import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source}
-import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, MicroBatchReadSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
 import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
 
 import org.apache.bahir.utils.Logging
@@ -87,32 +92,29 @@ private[akka] case class IteratorData(iterator: Iterator[String]) extends ActorR
 private[akka] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData
 private[akka] object Ack extends ActorReceiverData
 
-class AkkaStreamSource(urlOfPublisher: String,
-                       persistence: RocksDB, sqlContext: SQLContext,
-                       messageParser: String => (String, Timestamp))
-  extends Source with Logging {
+class AkkaMicroBatchReader(urlOfPublisher: String,
+                           persistence: RocksDB,
+                           messageParser: String => (String, Timestamp))
+  extends MicroBatchReader with Logging {
 
-  override def schema: StructType = AkkaStreamConstants.SCHEMA_DEFAULT
+  private val store = new LocalMessageStore(persistence, SparkEnv.get.conf)
 
-  private val store = new LocalMessageStore(persistence, sqlContext.sparkContext.getConf)
-
-  private val messages = new TrieMap[Int, (String, Timestamp)]()
+  private val messages = new TrieMap[Long, (String, Timestamp)]()
 
   private val initLock = new CountDownLatch(1)
 
-  private var offset = 0
+  @GuardedBy("this")
+  private var currentOffset: LongOffset = LongOffset(-1L)
+
+  @GuardedBy("this")
+  private var lastOffsetCommitted: LongOffset = LongOffset(-1L)
+
+  private var startOffset: Offset = _
+  private var endOffset: Offset = _
 
   private var actorSystem: ActorSystem = _
   private var actorSupervisor: ActorRef = _
 
-  private def fetchLastProcessedOffset(): Int = {
-    Try(store.maxProcessedOffset) match {
-      case Success(x) =>
-        log.info(s"Recovering from last stored offset $x")
-        x
-      case Failure(e) => 0
-    }
-  }
 
   initialize()
   private def initialize(): Unit = {
@@ -157,7 +159,7 @@ class AkkaStreamSource(urlOfPublisher: String,
 
         case data =>
           initLock.await()
-          var temp = offset + 1
+          var temp = currentOffset.offset + 1
 
           data match {
             case IteratorData(iterator) =>
@@ -198,80 +200,133 @@ class AkkaStreamSource(urlOfPublisher: String,
               val workers = context.children
               sender() ! Statistics(n.get(), workers.size, hiccups.get(), workers.mkString("\n"))
           }
-          offset = temp
+          currentOffset = LongOffset(temp)
       }
     }
 
     actorSystem = AkkaStreamConstants.defaultActorSystemCreator()
     actorSupervisor = actorSystem.actorOf(Props(new Supervisor), "Supervisor")
-    offset = fetchLastProcessedOffset()
+    if (store.maxProcessedOffset > 0) {
+      currentOffset = LongOffset(store.maxProcessedOffset)
+    }
     initLock.countDown()
   }
 
+  // This method is only used for unit test
+  private[akka] def getCurrentOffset: LongOffset = {
+    currentOffset.copy()
+  }
+
+
+  override def getEndOffset: Offset = {
+    Option(endOffset).getOrElse(throw new IllegalStateException("end offset not set"))
+  }
+
+  override def getStartOffset: Offset = {
+    Option(startOffset).getOrElse(throw new IllegalStateException("start offset not set"))
+  }
+
+  override def setOffsetRange(start: Optional[Offset],
+                              end: Optional[Offset]): Unit = synchronized {
+    startOffset = start.orElse(LongOffset(-1L))
+    endOffset = end.orElse(currentOffset)
+  }
+
+  override def commit(end: Offset): Unit = synchronized {
+    val newOffset = LongOffset.convert(end).getOrElse(
+      sys.error(s"AkkaMicroBatchReader.commit() received an offset ($end) that did not " +
+        s"originate with an instance of this class")
+    )
+
+    val offsetDiff = newOffset.offset - lastOffsetCommitted.offset
+
+    if (offsetDiff < 0) {
+      sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")
+    }
+
+    (lastOffsetCommitted.offset until newOffset.offset).foreach { x =>
+      messages.remove(x + 1)
+    }
+    lastOffsetCommitted = newOffset
+  }
+
+  override def deserializeOffset(json: String): Offset = {
+    LongOffset(json.toLong)
+  }
+
   override def stop(): Unit = {
     actorSupervisor ! PoisonPill
     Persistence.close()
-    actorSystem.shutdown()
-    actorSystem.awaitTermination()
+    Await.ready(actorSystem.terminate(), Duration.Inf)
   }
 
-  override def getOffset: Option[Offset] = {
-    if (offset == 0) {
-      None
-    } else {
-      Some(LongOffset(offset))
+  override def readSchema(): StructType = AkkaStreamConstants.SCHEMA_DEFAULT
+
+  override def createDataReaderFactories(): util.List[DataReaderFactory[Row]] = {
+    assert(startOffset != null && endOffset != null,
+      "start offset and end offset should already be set before create read tasks.")
+
+    val (start, end) = synchronized {
+      (LongOffset.convert(startOffset).get.offset + 1, LongOffset.convert(endOffset).get.offset + 1)
+    }
+    val rawList = for (i <- start until end) yield {
+      store.store(i, messages(i))
+      messages(i)
+    }
+
+    val numPartitions = SparkSession.getActiveSession.get.sparkContext.defaultParallelism
+
+    val slices = Array.fill(numPartitions)(new ArrayBuffer[(String, Timestamp)])
+    rawList.zipWithIndex.foreach { case (r, idx) =>
+      slices(idx % numPartitions).append(r)
     }
-  }
 
-  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
-    val startIndex = start.getOrElse(LongOffset(0L)).asInstanceOf[LongOffset].offset.toInt
-    val endIndex = end.asInstanceOf[LongOffset].offset.toInt
-    val data: ArrayBuffer[(String, Timestamp)] = ArrayBuffer.empty
+    (0 until numPartitions).map { i =>
+      val slice = slices(i)
+      new DataReaderFactory[Row] {
+        override def createDataReader(): DataReader[Row] = new DataReader[Row] {
+          private var currentIdx = -1
 
-    ((startIndex + 1) to endIndex).foreach { id =>
-      val element: (String, Timestamp) = messages.getOrElse(id,
-        store.retrieve[(String, Timestamp)](id).orNull)
+          override def next(): Boolean = {
+            currentIdx += 1
+            currentIdx < slice.size
+          }
 
-      if (!Objects.isNull(element)) {
-        data += element
-        store.store(id, element)
+          override def get(): Row = {
+            Row.fromTuple(slice(currentIdx))
+          }
+
+          override def close(): Unit = {}
+        }
       }
-      messages.remove(id, element)
-    }
-    log.trace(s"Get Batch invoked, ${data.mkString}")
-    import sqlContext.implicits._
-    data.toDF("value", "timestamp")
+    }.toList.asJava
   }
-}
 
-class AkkaStreamSourceProvider extends StreamSourceProvider with DataSourceRegister with Logging {
+}
 
-  override def sourceSchema(sqlContext: SQLContext, schema: Option[StructType],
-                            providerName: String, parameters: Map[String, String])
-  : (String, StructType) = ("akka", AkkaStreamConstants.SCHEMA_DEFAULT)
+class AkkaStreamSourceProvider extends MicroBatchReadSupport with DataSourceRegister with Logging {
 
-  override def createSource(sqlContext: SQLContext, metadataPath: String,
-                            schema: Option[StructType], providerName: String,
-                            parameters: Map[String, String]): Source = {
+  override def shortName(): String = "akka"
 
-    def e(s: String) = new IllegalArgumentException(s)
+  override def createMicroBatchReader(schema: Optional[StructType],
+                                      metadataPath: String,
+                                      options: DataSourceOptions): MicroBatchReader = {
+    val parameters = options.asMap().asScala.toMap
 
-    val urlOfPublisher: String = parameters.getOrElse("urlOfPublisher", parameters.getOrElse("path",
-      throw e(
+    val urlOfPublisher: String = parameters.getOrElse("urlofpublisher", parameters.getOrElse("path",
+      throw new IllegalArgumentException(
         s"""Please provide url of Publisher actor by specifying path
            | or .options("urlOfPublisher",...)""".stripMargin)))
 
-    val persistenceDirPath: String = parameters.getOrElse("persistenceDirPath",
+    val persistenceDirPath: String = parameters.getOrElse("persistencedirpath",
       System.getProperty("java.io.tmpdir"))
 
     val messageParserWithTimestamp = (x: String) =>
       (x, Timestamp.valueOf(AkkaStreamConstants.DATE_FORMAT.format(Calendar.getInstance().getTime)))
 
     val persistence = Persistence.getOrCreatePersistenceInstance(persistenceDirPath)
-    new AkkaStreamSource(urlOfPublisher, persistence, sqlContext, messageParserWithTimestamp)
+    new AkkaMicroBatchReader(urlOfPublisher, persistence, messageParserWithTimestamp)
   }
-
-  override def shortName(): String = "akka"
 }
 
 object Persistence {

http://git-wip-us.apache.org/repos/asf/bahir/blob/be1effaa/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/LongOffset.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/LongOffset.scala b/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/LongOffset.scala
new file mode 100644
index 0000000..c961487
--- /dev/null
+++ b/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/LongOffset.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bahir.sql.streaming.akka
+
+import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
+import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2}
+
+/**
+ * @note As of 2.3.0, [[org.apache.spark.sql.execution.streaming.LongOffset]]
+ *       hasn't extended v2 Offset yet. Fix version is 3.0.0. Until then
+ *       this is a required class.
+ * @see SPARK-23092
+ */
+case class LongOffset(offset: Long) extends OffsetV2 {
+
+  override val json = offset.toString
+
+  def +(increment: Long): LongOffset = new LongOffset(offset + increment)
+  def -(decrement: Long): LongOffset = new LongOffset(offset - decrement)
+}
+
+object LongOffset {
+
+  /**
+   * LongOffset factory from serialized offset.
+   * @return new LongOffset
+   */
+  def apply(offset: SerializedOffset) : LongOffset = new LongOffset(offset.json.toLong)
+
+  /**
+   * Convert generic Offset to LongOffset if possible.
+   * @return converted LongOffset
+   */
+  def convert(offset: Offset): Option[LongOffset] = offset match {
+    case lo: LongOffset => Some(lo)
+    case so: SerializedOffset => Some(LongOffset(so))
+    case _ => None
+  }
+}

http://git-wip-us.apache.org/repos/asf/bahir/blob/be1effaa/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/MessageStore.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/MessageStore.scala b/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/MessageStore.scala
index 9babd82..9b7f910 100644
--- a/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/MessageStore.scala
+++ b/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/MessageStore.scala
@@ -31,13 +31,13 @@ import org.apache.bahir.utils.Logging
 
 trait MessageStore {
 
-  def store[T: ClassTag](id: Int, message: T): Boolean
+  def store[T: ClassTag](id: Long, message: T): Boolean
 
-  def retrieve[T: ClassTag](start: Int, end: Int): Seq[Option[T]]
+  def retrieve[T: ClassTag](start: Long, end: Long): Seq[Option[T]]
 
-  def retrieve[T: ClassTag](id: Int): Option[T]
+  def retrieve[T: ClassTag](id: Long): Option[T]
 
-  def maxProcessedOffset: Int
+  def maxProcessedOffset: Long
 }
 
 private[akka] class LocalMessageStore(val persistentStore: RocksDB,
@@ -51,11 +51,11 @@ private[akka] class LocalMessageStore(val persistentStore: RocksDB,
 
   val serializerInstance: SerializerInstance = serializer.newInstance()
 
-  private def get(id: Int) = persistentStore.get(id.toString.getBytes)
+  private def get(id: Long) = persistentStore.get(id.toString.getBytes)
 
-  override def maxProcessedOffset: Int = persistentStore.getLatestSequenceNumber.toInt
+  override def maxProcessedOffset: Long = persistentStore.getLatestSequenceNumber
 
-  override def store[T: ClassTag](id: Int, message: T): Boolean = {
+  override def store[T: ClassTag](id: Long, message: T): Boolean = {
     val bytes: Array[Byte] = serializerInstance.serialize(message).array()
     try {
       persistentStore.put(id.toString.getBytes(), bytes)
@@ -66,11 +66,11 @@ private[akka] class LocalMessageStore(val persistentStore: RocksDB,
     }
   }
 
-  override def retrieve[T: ClassTag](start: Int, end: Int): Seq[Option[T]] = {
+  override def retrieve[T: ClassTag](start: Long, end: Long): Seq[Option[T]] = {
     (start until end).map(x => retrieve(x))
   }
 
-  override def retrieve[T: ClassTag](id: Int): Option[T] = {
+  override def retrieve[T: ClassTag](id: Long): Option[T] = {
     val bytes = persistentStore.get(id.toString.getBytes)
 
     if (bytes != null) {

http://git-wip-us.apache.org/repos/asf/bahir/blob/be1effaa/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala b/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala
index cdf629b..f61b067 100644
--- a/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala
+++ b/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.bahir.sql.streaming.akka
 
 import java.io.File
+import java.util.Optional
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -27,7 +28,8 @@ import org.scalatest.BeforeAndAfter
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.sql.{DataFrame, SparkSession, SQLContext}
 import org.apache.spark.sql.execution.streaming.FileStreamSource.Timestamp
-import org.apache.spark.sql.execution.streaming.LongOffset
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.types.StructType
 
 import org.apache.bahir.utils.BahirUtils
 
@@ -126,7 +128,8 @@ class BasicAkkaSourceSuite extends AkkaStreamSourceSuite {
     val parameters = Map("persistenceDirPath" -> persistenceDirPath)
 
     intercept[IllegalArgumentException] {
-      provider.createSource(sqlContext, "", None, "", parameters)
+      provider.createMicroBatchReader(
+        Optional.empty[StructType], "", new DataSourceOptions(parameters.asJava))
     }
   }
 
@@ -145,8 +148,10 @@ class BasicAkkaSourceSuite extends AkkaStreamSourceSuite {
     val parameters = Map("urlOfPublisher" -> akkaTestUtils.getFeederActorUri(),
       "persistenceDirPath" -> persistenceDirPath)
 
-    val offset: Long = provider.createSource(sqlContext, "", None, "", parameters)
-      .getOffset.get.asInstanceOf[LongOffset].offset
+    val offset: Long = provider.createMicroBatchReader(
+      Optional.empty[StructType], "", new DataSourceOptions(parameters.asJava))
+          .asInstanceOf[AkkaMicroBatchReader]
+          .getCurrentOffset.offset
     assert(offset === 100L)
   }
 }

http://git-wip-us.apache.org/repos/asf/bahir/blob/be1effaa/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaTestUtils.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaTestUtils.scala b/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaTestUtils.scala
index 9cbfc32..f494c0d 100644
--- a/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaTestUtils.scala
+++ b/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaTestUtils.scala
@@ -21,6 +21,8 @@ package org.apache.bahir.sql.streaming.akka
 import java.io.File
 
 import scala.collection.mutable
+import scala.concurrent.Await
+import scala.concurrent.duration._
 import scala.util.Random
 
 import akka.actor.{Actor, ActorRef, ActorSystem, ExtendedActorSystem, Props}
@@ -84,8 +86,7 @@ class AkkaTestUtils extends Logging {
   }
 
   def shutdown(): Unit = {
-//    actorSystem.awaitTermination()
-    actorSystem.shutdown()
+    Await.ready(actorSystem.terminate(), 5.seconds)
   }
 
   def setMessage(message: String): Unit = this.message = message

http://git-wip-us.apache.org/repos/asf/bahir/blob/be1effaa/streaming-akka/examples/src/main/scala/org/apache/spark/examples/streaming/akka/ActorWordCount.scala
----------------------------------------------------------------------
diff --git a/streaming-akka/examples/src/main/scala/org/apache/spark/examples/streaming/akka/ActorWordCount.scala b/streaming-akka/examples/src/main/scala/org/apache/spark/examples/streaming/akka/ActorWordCount.scala
index 3a06da8..09df4ba 100644
--- a/streaming-akka/examples/src/main/scala/org/apache/spark/examples/streaming/akka/ActorWordCount.scala
+++ b/streaming-akka/examples/src/main/scala/org/apache/spark/examples/streaming/akka/ActorWordCount.scala
@@ -15,10 +15,12 @@
  * limitations under the License.
  */
 
-// scalastyle:off println
+// scalastyle:off println awaitresult
 package org.apache.spark.examples.streaming.akka
 
 import scala.collection.mutable
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
 import scala.util.Random
 
 import akka.actor.{Props, _}
@@ -118,7 +120,7 @@ object FeederActor {
 
     println("Feeder started as:" + feeder)
 
-    actorSystem.awaitTermination()
+    Await.result(actorSystem.whenTerminated, Duration.Inf)
   }
 }
 
@@ -182,4 +184,4 @@ object ActorWordCount {
     ssc.awaitTermination()
   }
 }
-// scalastyle:on println
+// scalastyle:on println awaitresult

http://git-wip-us.apache.org/repos/asf/bahir/blob/be1effaa/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
----------------------------------------------------------------------
diff --git a/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala b/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
index d30e380..619bde2 100644
--- a/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
+++ b/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
@@ -20,7 +20,7 @@ package org.apache.spark.streaming.akka
 import java.nio.ByteBuffer
 import java.util.concurrent.atomic.AtomicInteger
 
-import scala.concurrent.Future
+import scala.concurrent.{Await, Future}
 import scala.concurrent.duration._
 import scala.language.postfixOps
 import scala.reflect.ClassTag
@@ -302,7 +302,6 @@ private[akka] class ActorReceiverSupervisor[T: ClassTag](
 
   def onStop(): Unit = {
     actorSupervisor ! PoisonPill
-    actorSystem.shutdown()
-    actorSystem.awaitTermination()
+    Await.ready(actorSystem.terminate(), Duration.Inf)
   }
 }

http://git-wip-us.apache.org/repos/asf/bahir/blob/be1effaa/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala
----------------------------------------------------------------------
diff --git a/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala b/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala
index e52bf0e..f403afa 100644
--- a/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala
+++ b/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.akka
 import java.util.concurrent.ConcurrentLinkedQueue
 
 import scala.collection.JavaConverters._
+import scala.concurrent.Await
 import scala.concurrent.duration._
 
 import akka.actor._
@@ -42,8 +43,7 @@ class AkkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter
       ssc = null
     }
     if (actorSystem != null) {
-      actorSystem.shutdown()
-      actorSystem.awaitTermination(30.seconds)
+      Await.ready(actorSystem.terminate(), 30.seconds)
       actorSystem = null
     }
   }

http://git-wip-us.apache.org/repos/asf/bahir/blob/be1effaa/streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala
----------------------------------------------------------------------
diff --git a/streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala b/streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala
index 535d0fc..00fd815 100644
--- a/streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala
+++ b/streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala
@@ -15,9 +15,11 @@
  * limitations under the License.
  */
 
-// scalastyle:off println
+// scalastyle:off println awaitresult
 package org.apache.spark.examples.streaming.zeromq
 
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
 import scala.language.implicitConversions
 
 import akka.actor.ActorSystem
@@ -53,7 +55,7 @@ object SimpleZeroMQPublisher {
       Thread.sleep(1000)
       pubSocket ! ZMQMessage(ByteString(topic) :: messages)
     }
-    acs.awaitTermination()
+    Await.result(acs.whenTerminated, Duration.Inf)
   }
 }
 
@@ -114,4 +116,4 @@ object ZeroMQWordCount {
     ssc.awaitTermination()
   }
 }
-// scalastyle:on println
+// scalastyle:on println awaitresult