You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2018/11/08 03:38:03 UTC
bahir git commit: [BAHIR-166] Migrate akka sql streaming source to
DataSource v2 API
Repository: bahir
Updated Branches:
refs/heads/master b3902bac6 -> be1effaaf
[BAHIR-166] Migrate akka sql streaming source to DataSource v2 API
Migrate akka sql streaming source to DataSource v2 API.
Closes #67
Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/be1effaa
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/be1effaa
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/be1effaa
Branch: refs/heads/master
Commit: be1effaaf7cfde28d19e032e038694e01fbf169b
Parents: b3902ba
Author: shimamoto <sh...@apache.org>
Authored: Thu May 31 17:15:04 2018 +0900
Committer: Luciano Resende <lr...@apache.org>
Committed: Wed Nov 7 19:28:11 2018 -0800
----------------------------------------------------------------------
pom.xml | 8 +-
sql-streaming-akka/README.md | 2 +-
.../sql/streaming/akka/AkkaStreamSource.scala | 183 ++++++++++++-------
.../bahir/sql/streaming/akka/LongOffset.scala | 54 ++++++
.../bahir/sql/streaming/akka/MessageStore.scala | 18 +-
.../streaming/akka/AkkaStreamSourceSuite.scala | 13 +-
.../sql/streaming/akka/AkkaTestUtils.scala | 5 +-
.../streaming/akka/ActorWordCount.scala | 8 +-
.../spark/streaming/akka/ActorReceiver.scala | 5 +-
.../spark/streaming/akka/AkkaStreamSuite.scala | 4 +-
.../streaming/zeromq/ZeroMQWordCount.scala | 8 +-
11 files changed, 214 insertions(+), 94 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bahir/blob/be1effaa/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 8282346..13407bd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,7 +77,7 @@
<modules>
<module>sql-cloudant</module>
<module>streaming-akka</module>
- <!-- <module>sql-streaming-akka</module> Disabling akka sql module, until it is updated to run with datasource v2 API. -->
+ <module>sql-streaming-akka</module>
<module>streaming-mqtt</module>
<module>sql-streaming-mqtt</module>
<module>streaming-twitter</module>
@@ -105,7 +105,7 @@
<mqtt.paho.client>1.1.0</mqtt.paho.client>
<!-- Streaming Akka connector -->
<akka.group>com.typesafe.akka</akka.group>
- <akka.version>2.4.20</akka.version>
+ <akka.version>2.5.12</akka.version>
<akka_zeromq.version>2.3.16</akka_zeromq.version>
<protobuf.version>2.5.0</protobuf.version>
@@ -569,7 +569,7 @@
<include>**/*Suite.java</include>
</includes>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
- <argLine>-ea -Xmx3g -Xss4m -XX:ReservedCodeCacheSize=${CodeCacheSize}</argLine>
+ <argLine>-Xmx3g -Xss4096k -XX:ReservedCodeCacheSize=${CodeCacheSize}</argLine>
<environmentVariables>
<!--
Setting SPARK_DIST_CLASSPATH is a simple way to make sure any child processes
@@ -1042,6 +1042,7 @@
</modules>
</profile>
+ <!--
<profile>
<id>scala-2.10</id>
<activation>
@@ -1124,6 +1125,7 @@
</plugins>
</build>
</profile>
+ -->
<profile>
<id>test-java-home</id>
http://git-wip-us.apache.org/repos/asf/bahir/blob/be1effaa/sql-streaming-akka/README.md
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/README.md b/sql-streaming-akka/README.md
index 3d9d17b..29685ee 100644
--- a/sql-streaming-akka/README.md
+++ b/sql-streaming-akka/README.md
@@ -45,7 +45,7 @@ Setting values for option `persistenceDirPath` helps in recovering in case of a
## Configuration options.
-This source uses [Akka Actor api](http://doc.akka.io/api/akka/2.4/akka/actor/Actor.html).
+This source uses [Akka Actor api](http://doc.akka.io/api/akka/2.5/akka/actor/Actor.html).
* `urlOfPublisher` The url of Publisher or Feeder actor that the Receiver actor connects to. Set this as the tcp url of the Publisher or Feeder actor.
* `persistenceDirPath` By default it is used for storing incoming messages on disk.
http://git-wip-us.apache.org/repos/asf/bahir/blob/be1effaa/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala b/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala
index 96d892f..3f2101c 100644
--- a/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala
+++ b/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala
@@ -20,16 +20,18 @@ package org.apache.bahir.sql.streaming.akka
import java.nio.ByteBuffer
import java.sql.Timestamp
import java.text.SimpleDateFormat
-import java.util.{Calendar, Objects}
+import java.util
+import java.util.{Calendar, Objects, Optional}
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicInteger
+import javax.annotation.concurrent.GuardedBy
+import scala.collection.JavaConverters._
import scala.collection.concurrent.TrieMap
import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
-import scala.concurrent.Future
import scala.language.postfixOps
-import scala.util.{Failure, Success, Try}
import akka.actor._
import akka.actor.SupervisorStrategy.{Escalate, Restart}
@@ -38,9 +40,12 @@ import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import org.rocksdb.{Options, RocksDB}
-import org.apache.spark.sql.{DataFrame, SQLContext}
-import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source}
-import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, MicroBatchReadSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
import org.apache.bahir.utils.Logging
@@ -87,32 +92,29 @@ private[akka] case class IteratorData(iterator: Iterator[String]) extends ActorR
private[akka] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData
private[akka] object Ack extends ActorReceiverData
-class AkkaStreamSource(urlOfPublisher: String,
- persistence: RocksDB, sqlContext: SQLContext,
- messageParser: String => (String, Timestamp))
- extends Source with Logging {
+class AkkaMicroBatchReader(urlOfPublisher: String,
+ persistence: RocksDB,
+ messageParser: String => (String, Timestamp))
+ extends MicroBatchReader with Logging {
- override def schema: StructType = AkkaStreamConstants.SCHEMA_DEFAULT
+ private val store = new LocalMessageStore(persistence, SparkEnv.get.conf)
- private val store = new LocalMessageStore(persistence, sqlContext.sparkContext.getConf)
-
- private val messages = new TrieMap[Int, (String, Timestamp)]()
+ private val messages = new TrieMap[Long, (String, Timestamp)]()
private val initLock = new CountDownLatch(1)
- private var offset = 0
+ @GuardedBy("this")
+ private var currentOffset: LongOffset = LongOffset(-1L)
+
+ @GuardedBy("this")
+ private var lastOffsetCommitted: LongOffset = LongOffset(-1L)
+
+ private var startOffset: Offset = _
+ private var endOffset: Offset = _
private var actorSystem: ActorSystem = _
private var actorSupervisor: ActorRef = _
- private def fetchLastProcessedOffset(): Int = {
- Try(store.maxProcessedOffset) match {
- case Success(x) =>
- log.info(s"Recovering from last stored offset $x")
- x
- case Failure(e) => 0
- }
- }
initialize()
private def initialize(): Unit = {
@@ -157,7 +159,7 @@ class AkkaStreamSource(urlOfPublisher: String,
case data =>
initLock.await()
- var temp = offset + 1
+ var temp = currentOffset.offset + 1
data match {
case IteratorData(iterator) =>
@@ -198,80 +200,133 @@ class AkkaStreamSource(urlOfPublisher: String,
val workers = context.children
sender() ! Statistics(n.get(), workers.size, hiccups.get(), workers.mkString("\n"))
}
- offset = temp
+ currentOffset = LongOffset(temp)
}
}
actorSystem = AkkaStreamConstants.defaultActorSystemCreator()
actorSupervisor = actorSystem.actorOf(Props(new Supervisor), "Supervisor")
- offset = fetchLastProcessedOffset()
+ if (store.maxProcessedOffset > 0) {
+ currentOffset = LongOffset(store.maxProcessedOffset)
+ }
initLock.countDown()
}
+ // This method is only used for unit test
+ private[akka] def getCurrentOffset: LongOffset = {
+ currentOffset.copy()
+ }
+
+
+ override def getEndOffset: Offset = {
+ Option(endOffset).getOrElse(throw new IllegalStateException("end offset not set"))
+ }
+
+ override def getStartOffset: Offset = {
+ Option(startOffset).getOrElse(throw new IllegalStateException("start offset not set"))
+ }
+
+ override def setOffsetRange(start: Optional[Offset],
+ end: Optional[Offset]): Unit = synchronized {
+ startOffset = start.orElse(LongOffset(-1L))
+ endOffset = end.orElse(currentOffset)
+ }
+
+ override def commit(end: Offset): Unit = synchronized {
+ val newOffset = LongOffset.convert(end).getOrElse(
+ sys.error(s"AkkaMicroBatchReader.commit() received an offset ($end) that did not " +
+ s"originate with an instance of this class")
+ )
+
+ val offsetDiff = newOffset.offset - lastOffsetCommitted.offset
+
+ if (offsetDiff < 0) {
+ sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")
+ }
+
+ (lastOffsetCommitted.offset until newOffset.offset).foreach { x =>
+ messages.remove(x + 1)
+ }
+ lastOffsetCommitted = newOffset
+ }
+
+ override def deserializeOffset(json: String): Offset = {
+ LongOffset(json.toLong)
+ }
+
override def stop(): Unit = {
actorSupervisor ! PoisonPill
Persistence.close()
- actorSystem.shutdown()
- actorSystem.awaitTermination()
+ Await.ready(actorSystem.terminate(), Duration.Inf)
}
- override def getOffset: Option[Offset] = {
- if (offset == 0) {
- None
- } else {
- Some(LongOffset(offset))
+ override def readSchema(): StructType = AkkaStreamConstants.SCHEMA_DEFAULT
+
+ override def createDataReaderFactories(): util.List[DataReaderFactory[Row]] = {
+ assert(startOffset != null && endOffset != null,
+ "start offset and end offset should already be set before create read tasks.")
+
+ val (start, end) = synchronized {
+ (LongOffset.convert(startOffset).get.offset + 1, LongOffset.convert(endOffset).get.offset + 1)
+ }
+ val rawList = for (i <- start until end) yield {
+ store.store(i, messages(i))
+ messages(i)
+ }
+
+ val numPartitions = SparkSession.getActiveSession.get.sparkContext.defaultParallelism
+
+ val slices = Array.fill(numPartitions)(new ArrayBuffer[(String, Timestamp)])
+ rawList.zipWithIndex.foreach { case (r, idx) =>
+ slices(idx % numPartitions).append(r)
}
- }
- override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
- val startIndex = start.getOrElse(LongOffset(0L)).asInstanceOf[LongOffset].offset.toInt
- val endIndex = end.asInstanceOf[LongOffset].offset.toInt
- val data: ArrayBuffer[(String, Timestamp)] = ArrayBuffer.empty
+ (0 until numPartitions).map { i =>
+ val slice = slices(i)
+ new DataReaderFactory[Row] {
+ override def createDataReader(): DataReader[Row] = new DataReader[Row] {
+ private var currentIdx = -1
- ((startIndex + 1) to endIndex).foreach { id =>
- val element: (String, Timestamp) = messages.getOrElse(id,
- store.retrieve[(String, Timestamp)](id).orNull)
+ override def next(): Boolean = {
+ currentIdx += 1
+ currentIdx < slice.size
+ }
- if (!Objects.isNull(element)) {
- data += element
- store.store(id, element)
+ override def get(): Row = {
+ Row.fromTuple(slice(currentIdx))
+ }
+
+ override def close(): Unit = {}
+ }
}
- messages.remove(id, element)
- }
- log.trace(s"Get Batch invoked, ${data.mkString}")
- import sqlContext.implicits._
- data.toDF("value", "timestamp")
+ }.toList.asJava
}
-}
-class AkkaStreamSourceProvider extends StreamSourceProvider with DataSourceRegister with Logging {
+}
- override def sourceSchema(sqlContext: SQLContext, schema: Option[StructType],
- providerName: String, parameters: Map[String, String])
- : (String, StructType) = ("akka", AkkaStreamConstants.SCHEMA_DEFAULT)
+class AkkaStreamSourceProvider extends MicroBatchReadSupport with DataSourceRegister with Logging {
- override def createSource(sqlContext: SQLContext, metadataPath: String,
- schema: Option[StructType], providerName: String,
- parameters: Map[String, String]): Source = {
+ override def shortName(): String = "akka"
- def e(s: String) = new IllegalArgumentException(s)
+ override def createMicroBatchReader(schema: Optional[StructType],
+ metadataPath: String,
+ options: DataSourceOptions): MicroBatchReader = {
+ val parameters = options.asMap().asScala.toMap
- val urlOfPublisher: String = parameters.getOrElse("urlOfPublisher", parameters.getOrElse("path",
- throw e(
+ val urlOfPublisher: String = parameters.getOrElse("urlofpublisher", parameters.getOrElse("path",
+ throw new IllegalArgumentException(
s"""Please provide url of Publisher actor by specifying path
| or .options("urlOfPublisher",...)""".stripMargin)))
- val persistenceDirPath: String = parameters.getOrElse("persistenceDirPath",
+ val persistenceDirPath: String = parameters.getOrElse("persistencedirpath",
System.getProperty("java.io.tmpdir"))
val messageParserWithTimestamp = (x: String) =>
(x, Timestamp.valueOf(AkkaStreamConstants.DATE_FORMAT.format(Calendar.getInstance().getTime)))
val persistence = Persistence.getOrCreatePersistenceInstance(persistenceDirPath)
- new AkkaStreamSource(urlOfPublisher, persistence, sqlContext, messageParserWithTimestamp)
+ new AkkaMicroBatchReader(urlOfPublisher, persistence, messageParserWithTimestamp)
}
-
- override def shortName(): String = "akka"
}
object Persistence {
http://git-wip-us.apache.org/repos/asf/bahir/blob/be1effaa/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/LongOffset.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/LongOffset.scala b/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/LongOffset.scala
new file mode 100644
index 0000000..c961487
--- /dev/null
+++ b/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/LongOffset.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bahir.sql.streaming.akka
+
+import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
+import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2}
+
+/**
+ * @note As of 2.3.0, [[org.apache.spark.sql.execution.streaming.LongOffset]]
+ * hasn't extended v2 Offset yet. Fix version is 3.0.0. Until then
+ * this is a required class.
+ * @see SPARK-23092
+ */
+case class LongOffset(offset: Long) extends OffsetV2 {
+
+ override val json = offset.toString
+
+ def +(increment: Long): LongOffset = new LongOffset(offset + increment)
+ def -(decrement: Long): LongOffset = new LongOffset(offset - decrement)
+}
+
+object LongOffset {
+
+ /**
+ * LongOffset factory from serialized offset.
+ * @return new LongOffset
+ */
+ def apply(offset: SerializedOffset) : LongOffset = new LongOffset(offset.json.toLong)
+
+ /**
+ * Convert generic Offset to LongOffset if possible.
+ * @return converted LongOffset
+ */
+ def convert(offset: Offset): Option[LongOffset] = offset match {
+ case lo: LongOffset => Some(lo)
+ case so: SerializedOffset => Some(LongOffset(so))
+ case _ => None
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir/blob/be1effaa/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/MessageStore.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/MessageStore.scala b/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/MessageStore.scala
index 9babd82..9b7f910 100644
--- a/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/MessageStore.scala
+++ b/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/MessageStore.scala
@@ -31,13 +31,13 @@ import org.apache.bahir.utils.Logging
trait MessageStore {
- def store[T: ClassTag](id: Int, message: T): Boolean
+ def store[T: ClassTag](id: Long, message: T): Boolean
- def retrieve[T: ClassTag](start: Int, end: Int): Seq[Option[T]]
+ def retrieve[T: ClassTag](start: Long, end: Long): Seq[Option[T]]
- def retrieve[T: ClassTag](id: Int): Option[T]
+ def retrieve[T: ClassTag](id: Long): Option[T]
- def maxProcessedOffset: Int
+ def maxProcessedOffset: Long
}
private[akka] class LocalMessageStore(val persistentStore: RocksDB,
@@ -51,11 +51,11 @@ private[akka] class LocalMessageStore(val persistentStore: RocksDB,
val serializerInstance: SerializerInstance = serializer.newInstance()
- private def get(id: Int) = persistentStore.get(id.toString.getBytes)
+ private def get(id: Long) = persistentStore.get(id.toString.getBytes)
- override def maxProcessedOffset: Int = persistentStore.getLatestSequenceNumber.toInt
+ override def maxProcessedOffset: Long = persistentStore.getLatestSequenceNumber
- override def store[T: ClassTag](id: Int, message: T): Boolean = {
+ override def store[T: ClassTag](id: Long, message: T): Boolean = {
val bytes: Array[Byte] = serializerInstance.serialize(message).array()
try {
persistentStore.put(id.toString.getBytes(), bytes)
@@ -66,11 +66,11 @@ private[akka] class LocalMessageStore(val persistentStore: RocksDB,
}
}
- override def retrieve[T: ClassTag](start: Int, end: Int): Seq[Option[T]] = {
+ override def retrieve[T: ClassTag](start: Long, end: Long): Seq[Option[T]] = {
(start until end).map(x => retrieve(x))
}
- override def retrieve[T: ClassTag](id: Int): Option[T] = {
+ override def retrieve[T: ClassTag](id: Long): Option[T] = {
val bytes = persistentStore.get(id.toString.getBytes)
if (bytes != null) {
http://git-wip-us.apache.org/repos/asf/bahir/blob/be1effaa/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala b/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala
index cdf629b..f61b067 100644
--- a/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala
+++ b/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala
@@ -18,6 +18,7 @@
package org.apache.bahir.sql.streaming.akka
import java.io.File
+import java.util.Optional
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -27,7 +28,8 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.sql.{DataFrame, SparkSession, SQLContext}
import org.apache.spark.sql.execution.streaming.FileStreamSource.Timestamp
-import org.apache.spark.sql.execution.streaming.LongOffset
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.types.StructType
import org.apache.bahir.utils.BahirUtils
@@ -126,7 +128,8 @@ class BasicAkkaSourceSuite extends AkkaStreamSourceSuite {
val parameters = Map("persistenceDirPath" -> persistenceDirPath)
intercept[IllegalArgumentException] {
- provider.createSource(sqlContext, "", None, "", parameters)
+ provider.createMicroBatchReader(
+ Optional.empty[StructType], "", new DataSourceOptions(parameters.asJava))
}
}
@@ -145,8 +148,10 @@ class BasicAkkaSourceSuite extends AkkaStreamSourceSuite {
val parameters = Map("urlOfPublisher" -> akkaTestUtils.getFeederActorUri(),
"persistenceDirPath" -> persistenceDirPath)
- val offset: Long = provider.createSource(sqlContext, "", None, "", parameters)
- .getOffset.get.asInstanceOf[LongOffset].offset
+ val offset: Long = provider.createMicroBatchReader(
+ Optional.empty[StructType], "", new DataSourceOptions(parameters.asJava))
+ .asInstanceOf[AkkaMicroBatchReader]
+ .getCurrentOffset.offset
assert(offset === 100L)
}
}
http://git-wip-us.apache.org/repos/asf/bahir/blob/be1effaa/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaTestUtils.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaTestUtils.scala b/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaTestUtils.scala
index 9cbfc32..f494c0d 100644
--- a/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaTestUtils.scala
+++ b/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaTestUtils.scala
@@ -21,6 +21,8 @@ package org.apache.bahir.sql.streaming.akka
import java.io.File
import scala.collection.mutable
+import scala.concurrent.Await
+import scala.concurrent.duration._
import scala.util.Random
import akka.actor.{Actor, ActorRef, ActorSystem, ExtendedActorSystem, Props}
@@ -84,8 +86,7 @@ class AkkaTestUtils extends Logging {
}
def shutdown(): Unit = {
-// actorSystem.awaitTermination()
- actorSystem.shutdown()
+ Await.ready(actorSystem.terminate(), 5.seconds)
}
def setMessage(message: String): Unit = this.message = message
http://git-wip-us.apache.org/repos/asf/bahir/blob/be1effaa/streaming-akka/examples/src/main/scala/org/apache/spark/examples/streaming/akka/ActorWordCount.scala
----------------------------------------------------------------------
diff --git a/streaming-akka/examples/src/main/scala/org/apache/spark/examples/streaming/akka/ActorWordCount.scala b/streaming-akka/examples/src/main/scala/org/apache/spark/examples/streaming/akka/ActorWordCount.scala
index 3a06da8..09df4ba 100644
--- a/streaming-akka/examples/src/main/scala/org/apache/spark/examples/streaming/akka/ActorWordCount.scala
+++ b/streaming-akka/examples/src/main/scala/org/apache/spark/examples/streaming/akka/ActorWordCount.scala
@@ -15,10 +15,12 @@
* limitations under the License.
*/
-// scalastyle:off println
+// scalastyle:off println awaitresult
package org.apache.spark.examples.streaming.akka
import scala.collection.mutable
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
import scala.util.Random
import akka.actor.{Props, _}
@@ -118,7 +120,7 @@ object FeederActor {
println("Feeder started as:" + feeder)
- actorSystem.awaitTermination()
+ Await.result(actorSystem.whenTerminated, Duration.Inf)
}
}
@@ -182,4 +184,4 @@ object ActorWordCount {
ssc.awaitTermination()
}
}
-// scalastyle:on println
+// scalastyle:on println awaitresult
http://git-wip-us.apache.org/repos/asf/bahir/blob/be1effaa/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
----------------------------------------------------------------------
diff --git a/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala b/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
index d30e380..619bde2 100644
--- a/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
+++ b/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
@@ -20,7 +20,7 @@ package org.apache.spark.streaming.akka
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicInteger
-import scala.concurrent.Future
+import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.reflect.ClassTag
@@ -302,7 +302,6 @@ private[akka] class ActorReceiverSupervisor[T: ClassTag](
def onStop(): Unit = {
actorSupervisor ! PoisonPill
- actorSystem.shutdown()
- actorSystem.awaitTermination()
+ Await.ready(actorSystem.terminate(), Duration.Inf)
}
}
http://git-wip-us.apache.org/repos/asf/bahir/blob/be1effaa/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala
----------------------------------------------------------------------
diff --git a/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala b/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala
index e52bf0e..f403afa 100644
--- a/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala
+++ b/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.akka
import java.util.concurrent.ConcurrentLinkedQueue
import scala.collection.JavaConverters._
+import scala.concurrent.Await
import scala.concurrent.duration._
import akka.actor._
@@ -42,8 +43,7 @@ class AkkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter
ssc = null
}
if (actorSystem != null) {
- actorSystem.shutdown()
- actorSystem.awaitTermination(30.seconds)
+ Await.ready(actorSystem.terminate(), 30.seconds)
actorSystem = null
}
}
http://git-wip-us.apache.org/repos/asf/bahir/blob/be1effaa/streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala
----------------------------------------------------------------------
diff --git a/streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala b/streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala
index 535d0fc..00fd815 100644
--- a/streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala
+++ b/streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala
@@ -15,9 +15,11 @@
* limitations under the License.
*/
-// scalastyle:off println
+// scalastyle:off println awaitresult
package org.apache.spark.examples.streaming.zeromq
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
import scala.language.implicitConversions
import akka.actor.ActorSystem
@@ -53,7 +55,7 @@ object SimpleZeroMQPublisher {
Thread.sleep(1000)
pubSocket ! ZMQMessage(ByteString(topic) :: messages)
}
- acs.awaitTermination()
+ Await.result(acs.whenTerminated, Duration.Inf)
}
}
@@ -114,4 +116,4 @@ object ZeroMQWordCount {
ssc.awaitTermination()
}
}
-// scalastyle:on println
+// scalastyle:on println awaitresult