You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2017/04/06 15:07:27 UTC
bahir git commit: [BAHIR-97] Akka as SQL Streaming datasource.
Repository: bahir
Updated Branches:
refs/heads/master f0d9a84f7 -> 889de659c
[BAHIR-97] Akka as SQL Streaming datasource.
Closes #38.
Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/889de659
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/889de659
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/889de659
Branch: refs/heads/master
Commit: 889de659c33dd56bad7193a4b69e6d05d061a2fd
Parents: f0d9a84
Author: Subhobrata Dey <sb...@gmail.com>
Authored: Sun Mar 26 21:30:30 2017 -0700
Committer: Luciano Resende <lr...@apache.org>
Committed: Thu Apr 6 08:05:09 2017 -0700
----------------------------------------------------------------------
pom.xml | 1 +
sql-streaming-akka/README.md | 111 +++++++
.../streaming/akka/JavaAkkaStreamWordCount.java | 95 ++++++
.../streaming/akka/AkkaStreamWordCount.scala | 72 +++++
sql-streaming-akka/pom.xml | 120 ++++++++
.../src/main/assembly/assembly.xml | 44 +++
.../sql/streaming/akka/AkkaStreamSource.scala | 294 +++++++++++++++++++
.../bahir/sql/streaming/akka/MessageStore.scala | 83 ++++++
.../org/apache/bahir/utils/BahirUtils.scala | 47 +++
.../scala/org/apache/bahir/utils/Logging.scala | 24 ++
.../src/test/resources/feeder_actor.conf | 34 +++
.../src/test/resources/log4j.properties | 27 ++
.../streaming/akka/AkkaStreamSourceSuite.scala | 191 ++++++++++++
.../sql/streaming/akka/AkkaTestUtils.scala | 93 ++++++
14 files changed, 1236 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 73cac1f..65129cd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,6 +77,7 @@
<modules>
<module>sql-cloudant</module>
<module>streaming-akka</module>
+ <module>sql-streaming-akka</module>
<module>streaming-mqtt</module>
<module>sql-streaming-mqtt</module>
<module>streaming-twitter</module>
http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/README.md
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/README.md b/sql-streaming-akka/README.md
new file mode 100644
index 0000000..b64a8e2
--- /dev/null
+++ b/sql-streaming-akka/README.md
@@ -0,0 +1,111 @@
+A library for reading data from Akka Actors using Spark SQL Streaming ( or Structured streaming.).
+
+## Linking
+
+Using SBT:
+
+ libraryDependencies += "org.apache.bahir" %% "spark-sql-streaming-akka" % "2.2.0-SNAPSHOT"
+
+Using Maven:
+
+ <dependency>
+ <groupId>org.apache.bahir</groupId>
+ <artifactId>spark-sql-streaming-akka_2.11</artifactId>
+ <version>2.2.0-SNAPSHOT</version>
+ </dependency>
+
+This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option.
+For example, to include it when starting the spark shell:
+
+ $ bin/spark-shell --packages org.apache.bahir:spark-sql-streaming-akka_2.11:2.2.0-SNAPSHOT
+
+Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
+The `--packages` argument can also be used with `bin/spark-submit`.
+
+This library is compiled for Scala 2.11 only, and intends to support Spark 2.0 onwards.
+
+## Examples
+
+A SQL Stream can be created with data streams received from Akka Feeder actor using,
+
+ sqlContext.readStream
+ .format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider")
+ .option("urlOfPublisher", "feederActorUri")
+ .load()
+
+## Enable recovering from failures.
+
+Setting values for option `persistenceDirPath` helps in recovering in case of a restart, by restoring the state where it left off before the shutdown.
+
+ sqlContext.readStream
+ .format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider")
+ .option("urlOfPublisher", "feederActorUri")
+ .option("persistenceDirPath", "/path/to/localdir")
+ .load()
+
+## Configuration options.
+
+This source uses [Akka Actor api](http://doc.akka.io/api/akka/2.4/akka/actor/Actor.html).
+
+* `urlOfPublisher` The url of Publisher or Feeder actor that the Receiver actor connects to. Set this as the tcp url of the Publisher or Feeder actor.
+* `persistenceDirPath` By default it is used for storing incoming messages on disk.
+
+### Scala API
+
+An example, for scala API to count words from incoming message stream.
+
+ // Create DataFrame representing the stream of input lines from connection
+ // to publisher or feeder actor
+ val lines = spark.readStream
+ .format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider")
+ .option("urlOfPublisher", urlOfPublisher)
+ .load().as[(String, Timestamp)]
+
+ // Split the lines into words
+ val words = lines.map(_._1).flatMap(_.split(" "))
+
+ // Generate running word count
+ val wordCounts = words.groupBy("value").count()
+
+ // Start running the query that prints the running counts to the console
+ val query = wordCounts.writeStream
+ .outputMode("complete")
+ .format("console")
+ .start()
+
+ query.awaitTermination()
+
+Please see `AkkaStreamWordCount.scala` for full example.
+
+### Java API
+
+An example, for Java API to count words from incoming message stream.
+
+ // Create DataFrame representing the stream of input lines from connection
+ // to publisher or feeder actor
+ Dataset<String> lines = spark
+ .readStream()
+ .format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider")
+ .option("urlOfPublisher", urlOfPublisher)
+ .load().select("value").as(Encoders.STRING());
+
+ // Split the lines into words
+ Dataset<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
+ @Override
+ public Iterator<String> call(String s) throws Exception {
+ return Arrays.asList(s.split(" ")).iterator();
+ }
+ }, Encoders.STRING());
+
+ // Generate running word count
+ Dataset<Row> wordCounts = words.groupBy("value").count();
+
+ // Start running the query that prints the running counts to the console
+ StreamingQuery query = wordCounts.writeStream()
+ .outputMode("complete")
+ .format("console")
+ .start();
+
+ query.awaitTermination();
+
+Please see `JavaAkkaStreamWordCount.java` for full example.
http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/examples/src/main/java/org/apache/bahir/examples/sql/streaming/akka/JavaAkkaStreamWordCount.java
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/examples/src/main/java/org/apache/bahir/examples/sql/streaming/akka/JavaAkkaStreamWordCount.java b/sql-streaming-akka/examples/src/main/java/org/apache/bahir/examples/sql/streaming/akka/JavaAkkaStreamWordCount.java
new file mode 100644
index 0000000..59146ae
--- /dev/null
+++ b/sql-streaming-akka/examples/src/main/java/org/apache/bahir/examples/sql/streaming/akka/JavaAkkaStreamWordCount.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bahir.examples.sql.streaming.akka;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.StreamingQuery;
+
+import java.util.Arrays;
+import java.util.Iterator;
+
+/**
+ * Counts words in UTF8 encoded, '\n' delimited text received from Akka Feeder Actor system.
+ *
+ * Usage: AkkaStreamWordCount <urlOfPublisher>
+ * <urlOfPublisher> provides the uri of the publisher or feeder actor that Structured Streaming
+ * would connect to receive data.
+ *
+ * To run this on your local machine, a Feeder Actor System should be up and running.
+ *
+ */
+public final class JavaAkkaStreamWordCount {
+
+ public static void main(String[] args) throws Exception {
+ if (args.length < 1) {
+ System.err.println("Usage: JavaAkkaStreamWordCount <urlOfPublisher>");
+ System.exit(1);
+ }
+
+ if (!Logger.getRootLogger().getAllAppenders().hasMoreElements()) {
+ Logger.getRootLogger().setLevel(Level.WARN);
+ }
+
+ String urlOfPublisher = args[0];
+
+ SparkConf sparkConf = new SparkConf().setAppName("JavaAkkaStreamWordCount");
+
+ // check Spark configuration for master URL, set it to local if not configured
+ if (!sparkConf.contains("spark.master")) {
+ sparkConf.setMaster("local[4]");
+ }
+
+ SparkSession spark = SparkSession.builder()
+ .config(sparkConf)
+ .getOrCreate();
+
+ // Create DataFrame representing the stream of input lines from connection
+ // to publisher or feeder actor
+ Dataset<String> lines = spark
+ .readStream()
+ .format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider")
+ .option("urlOfPublisher", urlOfPublisher)
+ .load().select("value").as(Encoders.STRING());
+
+ // Split the lines into words
+ Dataset<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
+ @Override
+ public Iterator<String> call(String s) throws Exception {
+ return Arrays.asList(s.split(" ")).iterator();
+ }
+ }, Encoders.STRING());
+
+ // Generate running word count
+ Dataset<Row> wordCounts = words.groupBy("value").count();
+
+ // Start running the query that prints the running counts to the console
+ StreamingQuery query = wordCounts.writeStream()
+ .outputMode("complete")
+ .format("console")
+ .start();
+
+ query.awaitTermination();
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/examples/src/main/scala/org/apache/bahir/examples/sql/streaming/akka/AkkaStreamWordCount.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/examples/src/main/scala/org/apache/bahir/examples/sql/streaming/akka/AkkaStreamWordCount.scala b/sql-streaming-akka/examples/src/main/scala/org/apache/bahir/examples/sql/streaming/akka/AkkaStreamWordCount.scala
new file mode 100644
index 0000000..8c4185a
--- /dev/null
+++ b/sql-streaming-akka/examples/src/main/scala/org/apache/bahir/examples/sql/streaming/akka/AkkaStreamWordCount.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bahir.examples.sql.streaming.akka
+
+import java.sql.Timestamp
+
+import org.apache.spark.sql.SparkSession
+
+/**
+ * Counts words in UTF8 encoded, '\n' delimited text received from Akka Feeder Actor system.
+ *
+ * Usage: AkkaStreamWordCount <urlOfPublisher>
+ * <urlOfPublisher> provides the uri of the publisher or feeder actor that Structured Streaming
+ * would connect to receive data.
+ *
+ * To run this on your local machine, a Feeder Actor System should be up and running.
+ *
+ */
+object AkkaStreamWordCount {
+ def main(args: Array[String]): Unit = {
+ if (args.length < 1) {
+ System.err.println("Usage: AkkaStreamWordCount <urlOfPublisher>") // scalastyle:off println
+ System.exit(1)
+ }
+
+ val urlOfPublisher = args(0)
+
+ val spark = SparkSession
+ .builder()
+ .appName("AkkaStreamWordCount")
+ .master("local[4]")
+ .getOrCreate()
+
+ import spark.implicits._
+
+ // Create DataFrame representing the stream of input lines from connection
+ // to publisher or feeder actor
+ val lines = spark.readStream
+ .format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider")
+ .option("urlOfPublisher", urlOfPublisher)
+ .load().as[(String, Timestamp)]
+
+ // Split the lines into words
+ val words = lines.map(_._1).flatMap(_.split(" "))
+
+ // Generate running word count
+ val wordCounts = words.groupBy("value").count()
+
+ // Start running the query that prints the running counts to the console
+ val query = wordCounts.writeStream
+ .outputMode("complete")
+ .format("console")
+ .start()
+
+ query.awaitTermination()
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/pom.xml
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/pom.xml b/sql-streaming-akka/pom.xml
new file mode 100644
index 0000000..4d7040b
--- /dev/null
+++ b/sql-streaming-akka/pom.xml
@@ -0,0 +1,120 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.bahir</groupId>
+ <artifactId>bahir-parent_2.11</artifactId>
+ <version>2.2.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.bahir</groupId>
+ <artifactId>spark-sql-streaming-akka_2.11</artifactId>
+ <properties>
+ <sbt.project.name>sql-streaming-akka</sbt.project.name>
+ </properties>
+ <packaging>jar</packaging>
+ <name>Apache Bahir - Spark SQL Streaming Akka</name>
+ <url>http://bahir.apache.org</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-tags_${scala.binary.version}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${akka.group}</groupId>
+ <artifactId>akka-actor_${scala.binary.version}</artifactId>
+ <version>${akka.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${akka.group}</groupId>
+ <artifactId>akka-remote_${scala.binary.version}</artifactId>
+ <version>${akka.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${akka.group}</groupId>
+ <artifactId>akka-slf4j_${scala.binary.version}</artifactId>
+ <version>${akka.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.rocksdb</groupId>
+ <artifactId>rocksdbjni</artifactId>
+ <version>5.1.2</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ </plugin>
+
+ <!-- Assemble a jar with test dependencies for Python tests -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>test-jar-with-dependencies</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <!-- Make sure the file path is same as the sbt build -->
+ <finalName>spark-streaming-akka-test-${project.version}</finalName>
+ <outputDirectory>${project.build.directory}/scala-${scala.binary.version}</outputDirectory>
+ <appendAssemblyId>false</appendAssemblyId>
+ <!-- Don't publish it since it's only for Python tests -->
+ <attach>false</attach>
+ <descriptors>
+ <descriptor>src/main/assembly/assembly.xml</descriptor>
+ </descriptors>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/src/main/assembly/assembly.xml
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/src/main/assembly/assembly.xml b/sql-streaming-akka/src/main/assembly/assembly.xml
new file mode 100644
index 0000000..58a95a0
--- /dev/null
+++ b/sql-streaming-akka/src/main/assembly/assembly.xml
@@ -0,0 +1,44 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<assembly>
+ <id>test-jar-with-dependencies</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+
+ <fileSets>
+ <fileSet>
+ <directory>${project.build.directory}/scala-${scala.binary.version}/test-classes</directory>
+ <outputDirectory></outputDirectory>
+ </fileSet>
+ </fileSets>
+
+ <dependencySets>
+ <dependencySet>
+ <useTransitiveDependencies>true</useTransitiveDependencies>
+ <scope>test</scope>
+ <unpack>true</unpack>
+ <excludes>
+ <exclude>org.apache.hadoop:*:jar</exclude>
+ <exclude>org.apache.zookeeper:*:jar</exclude>
+ <exclude>org.apache.avro:*:jar</exclude>
+ </excludes>
+ </dependencySet>
+ </dependencySets>
+
+</assembly>
http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala b/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala
new file mode 100644
index 0000000..96d892f
--- /dev/null
+++ b/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bahir.sql.streaming.akka
+
+import java.nio.ByteBuffer
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util.{Calendar, Objects}
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration._
+import scala.concurrent.Future
+import scala.language.postfixOps
+import scala.util.{Failure, Success, Try}
+
+import akka.actor._
+import akka.actor.SupervisorStrategy.{Escalate, Restart}
+import akka.pattern.ask
+import akka.util.Timeout
+import com.typesafe.config.ConfigFactory
+import org.rocksdb.{Options, RocksDB}
+
+import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source}
+import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
+import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
+
+import org.apache.bahir.utils.Logging
+
+object AkkaStreamConstants {
+
+ val SCHEMA_DEFAULT = StructType(StructField("value", StringType)
+ :: StructField("timestamp", TimestampType) :: Nil)
+
+ val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
+
+ val defaultSupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
+ 15 millis) {
+ case _: RuntimeException => Restart
+ case _: Exception => Escalate
+ }
+
+ val defaultActorSystemCreator: () => ActorSystem = () => {
+// val uniqueSystemName = s"streaming-actor-system-${TaskContext.get().taskAttemptId()}"
+ val uniqueSystemName = s"streaming-actor-system"
+ val akkaConf = ConfigFactory.parseString(
+ s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider"
+ |akka.remote.enabled-transports = ["akka.remote.netty.tcp"]
+ |akka.remote.netty.tcp.port = "0"
+ |akka.loggers.0 = "akka.event.slf4j.Slf4jLogger"
+ |akka.log-dead-letters-during-shutdown = "off"
+ """.stripMargin)
+ ActorSystem(uniqueSystemName, akkaConf)
+ }
+}
+
+case class SubscribeReceiver(receiverActor: ActorRef)
+case class UnsubscribeReceiver(receiverActor: ActorRef)
+
+case class Statistics(numberOfMsgs: Int,
+ numberOfWorkers: Int,
+ numberOfHiccups: Int,
+ otherInfo: String)
+
+private[akka] sealed trait ActorReceiverData
+private[akka] case class SingleItemData(item: String) extends ActorReceiverData
+private[akka] case class AskStoreSingleItemData(item: String) extends ActorReceiverData
+private[akka] case class IteratorData(iterator: Iterator[String]) extends ActorReceiverData
+private[akka] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData
+private[akka] object Ack extends ActorReceiverData
+
+class AkkaStreamSource(urlOfPublisher: String,
+ persistence: RocksDB, sqlContext: SQLContext,
+ messageParser: String => (String, Timestamp))
+ extends Source with Logging {
+
+ override def schema: StructType = AkkaStreamConstants.SCHEMA_DEFAULT
+
+ private val store = new LocalMessageStore(persistence, sqlContext.sparkContext.getConf)
+
+ private val messages = new TrieMap[Int, (String, Timestamp)]()
+
+ private val initLock = new CountDownLatch(1)
+
+ private var offset = 0
+
+ private var actorSystem: ActorSystem = _
+ private var actorSupervisor: ActorRef = _
+
+ private def fetchLastProcessedOffset(): Int = {
+ Try(store.maxProcessedOffset) match {
+ case Success(x) =>
+ log.info(s"Recovering from last stored offset $x")
+ x
+ case Failure(e) => 0
+ }
+ }
+
+ initialize()
+ private def initialize(): Unit = {
+
+ class ActorReceiver(urlOfPublisher: String) extends Actor {
+
+ lazy private val remotePublisher = context.actorSelection(urlOfPublisher)
+
+ override def preStart(): Unit = remotePublisher ! SubscribeReceiver(context.self)
+
+ override def receive: PartialFunction[Any, Unit] = {
+ case msg: String => store(msg)
+ }
+
+ override def postStop(): Unit = remotePublisher ! UnsubscribeReceiver(context.self)
+
+ def store(iter: Iterator[String]) = {
+ context.parent ! IteratorData(iter)
+ }
+
+ def store(item: String) = {
+ context.parent ! SingleItemData(item)
+ }
+
+ def store(item: String, timeout: Timeout): Future[Unit] = {
+ context.parent.ask(AskStoreSingleItemData(item))(timeout).map(_ => ())(context.dispatcher)
+ }
+ }
+
+ class Supervisor extends Actor {
+ override val supervisorStrategy = AkkaStreamConstants.defaultSupervisorStrategy
+
+ private val props = Props(new ActorReceiver(urlOfPublisher))
+ private val name = "ActorReceiver"
+ private val worker = context.actorOf(props, name)
+ log.info("Started receiver actor at:" + worker.path)
+
+ private val n: AtomicInteger = new AtomicInteger(0)
+ private val hiccups: AtomicInteger = new AtomicInteger(0)
+
+ override def receive: PartialFunction[Any, Unit] = {
+
+ case data =>
+ initLock.await()
+ var temp = offset + 1
+
+ data match {
+ case IteratorData(iterator) =>
+ log.debug("received iterator")
+ iterator.asInstanceOf[Iterator[String]].foreach(record => {
+ messages.put(temp, messageParser(record.toString))
+ temp += 1
+ })
+
+ case SingleItemData(msg) =>
+ log.debug("received single")
+ messages.put(temp, messageParser(msg))
+ n.incrementAndGet()
+
+ case AskStoreSingleItemData(msg) =>
+ log.debug("received single sync")
+ messages.put(temp, messageParser(msg))
+ n.incrementAndGet()
+ sender() ! Ack
+
+ case ByteBufferData(bytes) =>
+ log.debug("received bytes")
+ messages.put(temp, messageParser(new String(bytes.array())))
+
+ case props: Props =>
+ val worker = context.actorOf(props)
+ log.info("Started receiver worker at:" + worker.path)
+ sender() ! worker
+
+ case (props: Props, name: String) =>
+ val worker = context.actorOf(props, name)
+ log.info("Started receiver worker at:" + worker.path)
+ sender() ! worker
+
+ case _: PossiblyHarmful => hiccups.incrementAndGet()
+
+ case _: Statistics =>
+ val workers = context.children
+ sender() ! Statistics(n.get(), workers.size, hiccups.get(), workers.mkString("\n"))
+ }
+ offset = temp
+ }
+ }
+
+ actorSystem = AkkaStreamConstants.defaultActorSystemCreator()
+ actorSupervisor = actorSystem.actorOf(Props(new Supervisor), "Supervisor")
+ offset = fetchLastProcessedOffset()
+ initLock.countDown()
+ }
+
+ override def stop(): Unit = {
+ actorSupervisor ! PoisonPill
+ Persistence.close()
+ actorSystem.shutdown()
+ actorSystem.awaitTermination()
+ }
+
+ override def getOffset: Option[Offset] = {
+ if (offset == 0) {
+ None
+ } else {
+ Some(LongOffset(offset))
+ }
+ }
+
+ override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+ val startIndex = start.getOrElse(LongOffset(0L)).asInstanceOf[LongOffset].offset.toInt
+ val endIndex = end.asInstanceOf[LongOffset].offset.toInt
+ val data: ArrayBuffer[(String, Timestamp)] = ArrayBuffer.empty
+
+ ((startIndex + 1) to endIndex).foreach { id =>
+ val element: (String, Timestamp) = messages.getOrElse(id,
+ store.retrieve[(String, Timestamp)](id).orNull)
+
+ if (!Objects.isNull(element)) {
+ data += element
+ store.store(id, element)
+ }
+ messages.remove(id, element)
+ }
+ log.trace(s"Get Batch invoked, ${data.mkString}")
+ import sqlContext.implicits._
+ data.toDF("value", "timestamp")
+ }
+}
+
+class AkkaStreamSourceProvider extends StreamSourceProvider with DataSourceRegister with Logging {
+
+ override def sourceSchema(sqlContext: SQLContext, schema: Option[StructType],
+ providerName: String, parameters: Map[String, String])
+ : (String, StructType) = ("akka", AkkaStreamConstants.SCHEMA_DEFAULT)
+
+ override def createSource(sqlContext: SQLContext, metadataPath: String,
+ schema: Option[StructType], providerName: String,
+ parameters: Map[String, String]): Source = {
+
+ def e(s: String) = new IllegalArgumentException(s)
+
+ val urlOfPublisher: String = parameters.getOrElse("urlOfPublisher", parameters.getOrElse("path",
+ throw e(
+ s"""Please provide url of Publisher actor by specifying path
+ | or .options("urlOfPublisher",...)""".stripMargin)))
+
+ val persistenceDirPath: String = parameters.getOrElse("persistenceDirPath",
+ System.getProperty("java.io.tmpdir"))
+
+ val messageParserWithTimestamp = (x: String) =>
+ (x, Timestamp.valueOf(AkkaStreamConstants.DATE_FORMAT.format(Calendar.getInstance().getTime)))
+
+ val persistence = Persistence.getOrCreatePersistenceInstance(persistenceDirPath)
+ new AkkaStreamSource(urlOfPublisher, persistence, sqlContext, messageParserWithTimestamp)
+ }
+
+ override def shortName(): String = "akka"
+}
+
+object Persistence {
+ var persistence: RocksDB = _
+
+ def getOrCreatePersistenceInstance(persistenceDirPath: String): RocksDB = {
+ if (Objects.isNull(persistence)) {
+ RocksDB.loadLibrary()
+ persistence = RocksDB.open(new Options().setCreateIfMissing(true), persistenceDirPath)
+ }
+ persistence
+ }
+
+ def close(): Unit = {
+ if (!Objects.isNull(persistence)) {
+ persistence.close()
+ persistence = null
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/MessageStore.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/MessageStore.scala b/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/MessageStore.scala
new file mode 100644
index 0000000..9babd82
--- /dev/null
+++ b/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/MessageStore.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bahir.sql.streaming.akka
+
+import java.nio.ByteBuffer
+
+import scala.reflect.ClassTag
+
+import org.rocksdb.RocksDB
+
+import org.apache.spark.serializer.{JavaSerializer, Serializer, SerializerInstance}
+import org.apache.spark.SparkConf
+
+import org.apache.bahir.utils.Logging
+
+
+trait MessageStore {
+
+ def store[T: ClassTag](id: Int, message: T): Boolean
+
+ def retrieve[T: ClassTag](start: Int, end: Int): Seq[Option[T]]
+
+ def retrieve[T: ClassTag](id: Int): Option[T]
+
+ def maxProcessedOffset: Int
+}
+
+private[akka] class LocalMessageStore(val persistentStore: RocksDB,
+ val serializer: Serializer)
+ extends MessageStore with Logging {
+
+ val classLoader = Thread.currentThread().getContextClassLoader
+
+ def this(persistentStore: RocksDB, conf: SparkConf) =
+ this(persistentStore, new JavaSerializer(conf))
+
+ val serializerInstance: SerializerInstance = serializer.newInstance()
+
+ private def get(id: Int) = persistentStore.get(id.toString.getBytes)
+
+ override def maxProcessedOffset: Int = persistentStore.getLatestSequenceNumber.toInt
+
+ override def store[T: ClassTag](id: Int, message: T): Boolean = {
+ val bytes: Array[Byte] = serializerInstance.serialize(message).array()
+ try {
+ persistentStore.put(id.toString.getBytes(), bytes)
+ true
+ } catch {
+ case e: Exception => log.warn(s"Failed to store message Id: $id", e)
+ false
+ }
+ }
+
+ override def retrieve[T: ClassTag](start: Int, end: Int): Seq[Option[T]] = {
+ (start until end).map(x => retrieve(x))
+ }
+
+ override def retrieve[T: ClassTag](id: Int): Option[T] = {
+ val bytes = persistentStore.get(id.toString.getBytes)
+
+ if (bytes != null) {
+ Some(serializerInstance.deserialize(
+ ByteBuffer.wrap(bytes), classLoader))
+ } else {
+ None
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/BahirUtils.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/BahirUtils.scala b/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/BahirUtils.scala
new file mode 100644
index 0000000..996a0a1
--- /dev/null
+++ b/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/BahirUtils.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bahir.utils
+
+import java.io.{File, IOException}
+import java.nio.file.{Files, FileVisitResult, Path, SimpleFileVisitor}
+import java.nio.file.attribute.BasicFileAttributes
+
+object BahirUtils extends Logging {
+
+ def recursiveDeleteDir(dir: File): Path = {
+ Files.walkFileTree(dir.toPath, new SimpleFileVisitor[Path]() {
+ override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = {
+ try {
+ Files.delete(file)
+ } catch {
+ case t: Throwable => log.warn("Failed to delete", t)
+ }
+ FileVisitResult.CONTINUE
+ }
+
+ override def postVisitDirectory(dir: Path, exc: IOException): FileVisitResult = {
+ try {
+ Files.delete(dir)
+ } catch {
+ case t: Throwable => log.warn("Failed to delete", t)
+ }
+ FileVisitResult.CONTINUE
+ }
+ })
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/Logging.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/Logging.scala b/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/Logging.scala
new file mode 100644
index 0000000..776ed5a
--- /dev/null
+++ b/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/Logging.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bahir.utils
+
+import org.slf4j.LoggerFactory
+
+trait Logging {
+ final val log = LoggerFactory.getLogger(this.getClass.getName.stripSuffix("$"))
+}
http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/src/test/resources/feeder_actor.conf
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/src/test/resources/feeder_actor.conf b/sql-streaming-akka/src/test/resources/feeder_actor.conf
new file mode 100644
index 0000000..9ec210e
--- /dev/null
+++ b/sql-streaming-akka/src/test/resources/feeder_actor.conf
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+akka {
+ loglevel = "INFO"
+ actor {
+ provider = "akka.remote.RemoteActorRefProvider"
+ }
+ remote {
+ enabled-transports = ["akka.remote.netty.tcp"]
+ netty.tcp {
+ hostname = "127.0.0.1"
+ port = 0
+ }
+ log-sent-messages = on
+ log-received-messages = on
+ }
+ loggers.0 = "akka.event.slf4j.Slf4jLogger"
+ log-dead-letters-during-shutdown = "off"
+}
http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/src/test/resources/log4j.properties b/sql-streaming-akka/src/test/resources/log4j.properties
new file mode 100644
index 0000000..3706a6e
--- /dev/null
+++ b/sql-streaming-akka/src/test/resources/log4j.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=true
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.spark_project.jetty=WARN
http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala b/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala
new file mode 100644
index 0000000..a04dc66
--- /dev/null
+++ b/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bahir.sql.streaming.akka
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.sql.{DataFrame, SparkSession, SQLContext}
+import org.apache.spark.sql.execution.streaming.FileStreamSource.Timestamp
+import org.apache.spark.sql.execution.streaming.LongOffset
+
+import org.apache.bahir.utils.BahirUtils
+
+class AkkaStreamSourceSuite extends SparkFunSuite with BeforeAndAfter {
+
+ protected var akkaTestUtils: AkkaTestUtils = _
+ protected val tempDir: File =
+ new File(System.getProperty("java.io.tmpdir") + "/spark-akka-persistence")
+
+ private val conf = new SparkConf().setMaster("local[4]").setAppName("AkkaStreamSourceSuite")
+ protected val spark = SparkSession.builder().config(conf).getOrCreate()
+
+ akkaTestUtils = new AkkaTestUtils
+ akkaTestUtils.setup()
+
+ before {
+ tempDir.mkdirs()
+ }
+
+ after {
+ Persistence.close()
+ BahirUtils.recursiveDeleteDir(tempDir)
+ }
+
+ protected val tmpDir: String = tempDir.getAbsolutePath
+
+ protected def createStreamingDataframe(dir: String = tmpDir): (SQLContext, DataFrame) = {
+
+ val sqlContext: SQLContext = spark.sqlContext
+
+ sqlContext.setConf("spark.sql.streaming.checkpointLocation", dir + "/checkpoint")
+
+ val dataFrame: DataFrame =
+ sqlContext.readStream.format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider")
+ .option("urlOfPublisher", akkaTestUtils.getFeederActorUri())
+ .option("persistenceDirPath", dir + "/persistence").load()
+ (sqlContext, dataFrame)
+ }
+}
+
+class BasicAkkaSourceSuite extends AkkaStreamSourceSuite {
+
+ private def writeStreamResults(sqlContext: SQLContext, dataFrame: DataFrame,
+ waitDuration: Long): Boolean = {
+ import sqlContext.implicits._
+ dataFrame.as[(String, Timestamp)].writeStream.format("parquet")
+ .start(s"$tmpDir/parquet/t.parquet").awaitTermination(waitDuration)
+ }
+
+ private def readBackSreamingResults(sqlContext: SQLContext): mutable.Buffer[String] = {
+ import sqlContext.implicits._
+ val asList =
+ sqlContext.read.schema(AkkaStreamConstants.SCHEMA_DEFAULT)
+ .parquet(s"$tmpDir/parquet/t.parquet").as[(String, Timestamp)].map(_._1)
+ .collectAsList().asScala
+ asList
+ }
+
+ test("basic usage") {
+ val message = "Akka is a reactive framework"
+
+ akkaTestUtils.setMessage(message)
+ akkaTestUtils.setCountOfMessages(1)
+
+ val (sqlContext: SQLContext, dataFrame: DataFrame) = createStreamingDataframe()
+
+ writeStreamResults(sqlContext, dataFrame, 10000)
+
+ val resultBuffer: mutable.Buffer[String] = readBackSreamingResults(sqlContext)
+
+ assert(resultBuffer.size === 1)
+ assert(resultBuffer.head === message)
+ }
+
+ test("Send and receive 100 messages.") {
+ val message = "Akka is a reactive framework"
+
+ akkaTestUtils.setMessage(message)
+ akkaTestUtils.setCountOfMessages(100)
+
+ val (sqlContext: SQLContext, dataFrame: DataFrame) = createStreamingDataframe()
+
+ writeStreamResults(sqlContext, dataFrame, 10000)
+
+ val resultBuffer: mutable.Buffer[String] = readBackSreamingResults(sqlContext)
+
+ assert(resultBuffer.size === 100)
+ assert(resultBuffer.head === message)
+ }
+
+ test("params not provided") {
+ val persistenceDirPath = tempDir.getAbsolutePath + "/persistence"
+
+ val provider = new AkkaStreamSourceProvider
+ val sqlContext: SQLContext = spark.sqlContext
+
+ val parameters = Map("persistenceDirPath" -> persistenceDirPath)
+
+ intercept[IllegalArgumentException] {
+ provider.createSource(sqlContext, "", None, "", parameters)
+ }
+ }
+
+ test("Recovering offset from the last processed offset") {
+ val persistenceDirPath = tempDir.getAbsolutePath + "/persistence"
+ val message = "Akka is a reactive framework"
+
+ akkaTestUtils.setMessage(message)
+ akkaTestUtils.setCountOfMessages(100)
+
+ val (sqlContext: SQLContext, dataFrame: DataFrame) = createStreamingDataframe()
+
+ writeStreamResults(sqlContext, dataFrame, 10000)
+
+ val provider = new AkkaStreamSourceProvider
+ val parameters = Map("urlOfPublisher" -> akkaTestUtils.getFeederActorUri(),
+ "persistenceDirPath" -> persistenceDirPath)
+
+ val offset: Long = provider.createSource(sqlContext, "", None, "", parameters)
+ .getOffset.get.asInstanceOf[LongOffset].offset
+ assert(offset === 100L)
+ }
+}
+
+class StressTestAkkaSource extends AkkaStreamSourceSuite {
+
+ // Run with -Xmx1024m
+ // Default allowed payload size sent to an akka actor is 128000 bytes.
+ test("Send & Receive messages of size 128000 bytes.") {
+
+ val freeMemory: Long = Runtime.getRuntime.freeMemory()
+
+ log.info(s"Available memory before test run is ${freeMemory / (1024 * 1024)}MB.")
+
+ val noOfMsgs = 124 * 1024
+
+ val messageBuilder = new mutable.StringBuilder()
+ for (i <- 0 until noOfMsgs) yield messageBuilder.append(((i % 26) + 65).toChar)
+
+ val message = messageBuilder.toString()
+
+ akkaTestUtils.setMessage(message)
+ akkaTestUtils.setCountOfMessages(1)
+
+ val (sqlContext: SQLContext, dataFrame: DataFrame) = createStreamingDataframe()
+
+ import sqlContext.implicits._
+
+ dataFrame.as[(String, Timestamp)].writeStream
+ .format("parquet")
+ .start(s"$tmpDir/parquet/t.parquet")
+ .awaitTermination(25000)
+
+ val outputMessage =
+ sqlContext.read.schema(AkkaStreamConstants.SCHEMA_DEFAULT)
+ .parquet(s"$tmpDir/parquet/t.parquet").as[(String, Timestamp)]
+ .map(_._1).head()
+
+ assert(outputMessage === message)
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaTestUtils.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaTestUtils.scala b/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaTestUtils.scala
new file mode 100644
index 0000000..9cbfc32
--- /dev/null
+++ b/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaTestUtils.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.bahir.sql.streaming.akka
+
+import java.io.File
+
+import scala.collection.mutable
+import scala.util.Random
+
+import akka.actor.{Actor, ActorRef, ActorSystem, ExtendedActorSystem, Props}
+import com.typesafe.config.{Config, ConfigFactory}
+
+import org.apache.bahir.utils.Logging
+
+class AkkaTestUtils extends Logging {
+ private val actorSystemName = "feeder-actor-system"
+ private var actorSystem: ActorSystem = _
+
+ private val feederActorName = "feederActor"
+
+ private var message: String = _
+ private var count = 1
+
+ def getFeederActorConfig(): Config = {
+ val configFile = getClass.getClassLoader
+ .getResource("feeder_actor.conf").getFile
+ ConfigFactory.parseFile(new File(configFile))
+ }
+
+ def getFeederActorUri(): String =
+ s"${actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress}" +
+ s"/user/$feederActorName"
+
+ class FeederActor extends Actor {
+
+ val rand = new Random()
+ val receivers = new mutable.LinkedHashSet[ActorRef]()
+
+ val sendMessageThread =
+ new Thread() {
+ override def run(): Unit = {
+ var counter = 0
+ while (counter < count) {
+// Thread.sleep(500)
+ receivers.foreach(_ ! message)
+ counter += 1
+ }
+ }
+ }
+
+ override def receive: Receive = {
+ case SubscribeReceiver(receiverActor: ActorRef) =>
+ log.debug(s"received subscribe from ${receiverActor.toString}")
+ receivers += receiverActor
+ sendMessageThread.run()
+
+ case UnsubscribeReceiver(receiverActor: ActorRef) =>
+ log.debug(s"received unsubscribe from ${receiverActor.toString}")
+ receivers -= receiverActor
+ }
+ }
+
+ def setup(): Unit = {
+ val feederConf = getFeederActorConfig()
+
+ actorSystem = ActorSystem(actorSystemName, feederConf)
+ actorSystem.actorOf(Props(new FeederActor), feederActorName)
+ }
+
+ def shutdown(): Unit = {
+// actorSystem.awaitTermination()
+ actorSystem.shutdown()
+ }
+
+ def setMessage(message: String): Unit = this.message = message
+ def setCountOfMessages(messageCount: Int): Unit = count = messageCount
+}