You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2017/04/06 15:07:27 UTC

bahir git commit: [BAHIR-97] Akka as SQL Streaming datasource.

Repository: bahir
Updated Branches:
  refs/heads/master f0d9a84f7 -> 889de659c


[BAHIR-97] Akka as SQL Streaming datasource.

Closes #38.


Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/889de659
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/889de659
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/889de659

Branch: refs/heads/master
Commit: 889de659c33dd56bad7193a4b69e6d05d061a2fd
Parents: f0d9a84
Author: Subhobrata Dey <sb...@gmail.com>
Authored: Sun Mar 26 21:30:30 2017 -0700
Committer: Luciano Resende <lr...@apache.org>
Committed: Thu Apr 6 08:05:09 2017 -0700

----------------------------------------------------------------------
 pom.xml                                         |   1 +
 sql-streaming-akka/README.md                    | 111 +++++++
 .../streaming/akka/JavaAkkaStreamWordCount.java |  95 ++++++
 .../streaming/akka/AkkaStreamWordCount.scala    |  72 +++++
 sql-streaming-akka/pom.xml                      | 120 ++++++++
 .../src/main/assembly/assembly.xml              |  44 +++
 .../sql/streaming/akka/AkkaStreamSource.scala   | 294 +++++++++++++++++++
 .../bahir/sql/streaming/akka/MessageStore.scala |  83 ++++++
 .../org/apache/bahir/utils/BahirUtils.scala     |  47 +++
 .../scala/org/apache/bahir/utils/Logging.scala  |  24 ++
 .../src/test/resources/feeder_actor.conf        |  34 +++
 .../src/test/resources/log4j.properties         |  27 ++
 .../streaming/akka/AkkaStreamSourceSuite.scala  | 191 ++++++++++++
 .../sql/streaming/akka/AkkaTestUtils.scala      |  93 ++++++
 14 files changed, 1236 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 73cac1f..65129cd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,6 +77,7 @@
   <modules>
     <module>sql-cloudant</module>
     <module>streaming-akka</module>
+    <module>sql-streaming-akka</module>
     <module>streaming-mqtt</module>
     <module>sql-streaming-mqtt</module>
     <module>streaming-twitter</module>

http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/README.md
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/README.md b/sql-streaming-akka/README.md
new file mode 100644
index 0000000..b64a8e2
--- /dev/null
+++ b/sql-streaming-akka/README.md
@@ -0,0 +1,111 @@
+A library for reading data from Akka Actors using Spark SQL Streaming ( or Structured streaming.). 
+
+## Linking
+
+Using SBT:
+
+    libraryDependencies += "org.apache.bahir" %% "spark-sql-streaming-akka" % "2.2.0-SNAPSHOT"
+
+Using Maven:
+
+    <dependency>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>spark-sql-streaming-akka_2.11</artifactId>
+        <version>2.2.0-SNAPSHOT</version>
+    </dependency>
+
+This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option.
+For example, to include it when starting the spark shell:
+
+    $ bin/spark-shell --packages org.apache.bahir:spark-sql-streaming-akka_2.11:2.2.0-SNAPSHOT
+
+Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
+The `--packages` argument can also be used with `bin/spark-submit`.
+
+This library is compiled for Scala 2.11 only, and intends to support Spark 2.0 onwards.
+
+## Examples
+
+A SQL Stream can be created with data streams received from Akka Feeder actor using,
+
+        sqlContext.readStream
+                .format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider")
+                .option("urlOfPublisher", "feederActorUri")
+                .load()
+                
+## Enable recovering from failures.
+                
+Setting values for option `persistenceDirPath` helps in recovering in case of a restart, by restoring the state where it left off before the shutdown.
+                
+        sqlContext.readStream
+                .format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider")
+                .option("urlOfPublisher", "feederActorUri")
+                .option("persistenceDirPath", "/path/to/localdir")
+                .load() 
+                       
+## Configuration options.
+                       
+This source uses [Akka Actor api](http://doc.akka.io/api/akka/2.4/akka/actor/Actor.html).
+                       
+* `urlOfPublisher` The url of Publisher or Feeder actor that the Receiver actor connects to. Set this as the tcp url of the Publisher or Feeder actor.
+* `persistenceDirPath` By default it is used for storing incoming messages on disk.
+
+### Scala API
+
+An example, for scala API to count words from incoming message stream. 
+
+        // Create DataFrame representing the stream of input lines from connection
+        // to publisher or feeder actor
+        val lines = spark.readStream
+                    .format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider")
+                    .option("urlOfPublisher", urlOfPublisher)
+                    .load().as[(String, Timestamp)]
+    
+        // Split the lines into words
+        val words = lines.map(_._1).flatMap(_.split(" "))
+    
+        // Generate running word count
+        val wordCounts = words.groupBy("value").count()
+    
+        // Start running the query that prints the running counts to the console
+        val query = wordCounts.writeStream
+                    .outputMode("complete")
+                    .format("console")
+                    .start()
+    
+        query.awaitTermination()
+        
+Please see `AkkaStreamWordCount.scala` for full example.     
+   
+### Java API
+   
+An example, for Java API to count words from incoming message stream.
+   
+        // Create DataFrame representing the stream of input lines from connection
+        // to publisher or feeder actor
+        Dataset<String> lines = spark
+                                .readStream()
+                                .format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider")
+                                .option("urlOfPublisher", urlOfPublisher)
+                                .load().select("value").as(Encoders.STRING());
+    
+        // Split the lines into words
+        Dataset<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
+          @Override
+          public Iterator<String> call(String s) throws Exception {
+            return Arrays.asList(s.split(" ")).iterator();
+          }
+        }, Encoders.STRING());
+    
+        // Generate running word count
+        Dataset<Row> wordCounts = words.groupBy("value").count();
+    
+        // Start running the query that prints the running counts to the console
+        StreamingQuery query = wordCounts.writeStream()
+                                .outputMode("complete")
+                                .format("console")
+                                .start();
+    
+        query.awaitTermination();   
+         
+Please see `JavaAkkaStreamWordCount.java` for full example.      

http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/examples/src/main/java/org/apache/bahir/examples/sql/streaming/akka/JavaAkkaStreamWordCount.java
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/examples/src/main/java/org/apache/bahir/examples/sql/streaming/akka/JavaAkkaStreamWordCount.java b/sql-streaming-akka/examples/src/main/java/org/apache/bahir/examples/sql/streaming/akka/JavaAkkaStreamWordCount.java
new file mode 100644
index 0000000..59146ae
--- /dev/null
+++ b/sql-streaming-akka/examples/src/main/java/org/apache/bahir/examples/sql/streaming/akka/JavaAkkaStreamWordCount.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bahir.examples.sql.streaming.akka;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.StreamingQuery;
+
+import java.util.Arrays;
+import java.util.Iterator;
+
+/**
+ * Counts words in UTF8 encoded, '\n' delimited text received from Akka Feeder Actor system.
+ *
+ * Usage: AkkaStreamWordCount <urlOfPublisher>
+ * <urlOfPublisher> provides the uri of the publisher or feeder actor that Structured Streaming
+ * would connect to receive data.
+ *
+ * To run this on your local machine, a Feeder Actor System should be up and running.
+ *
+ */
+public final class JavaAkkaStreamWordCount {
+
+  public static void main(String[] args) throws Exception {
+    if (args.length < 1) {
+      System.err.println("Usage: JavaAkkaStreamWordCount <urlOfPublisher>");
+      System.exit(1);
+    }
+
+    if (!Logger.getRootLogger().getAllAppenders().hasMoreElements()) {
+      Logger.getRootLogger().setLevel(Level.WARN);
+    }
+
+    String urlOfPublisher = args[0];
+
+    SparkConf sparkConf = new SparkConf().setAppName("JavaAkkaStreamWordCount");
+
+    // check Spark configuration for master URL, set it to local if not configured
+    if (!sparkConf.contains("spark.master")) {
+      sparkConf.setMaster("local[4]");
+    }
+
+    SparkSession spark = SparkSession.builder()
+                          .config(sparkConf)
+                          .getOrCreate();
+
+    // Create DataFrame representing the stream of input lines from connection
+    // to publisher or feeder actor
+    Dataset<String> lines = spark
+                            .readStream()
+                            .format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider")
+                            .option("urlOfPublisher", urlOfPublisher)
+                            .load().select("value").as(Encoders.STRING());
+
+    // Split the lines into words
+    Dataset<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
+      @Override
+      public Iterator<String> call(String s) throws Exception {
+        return Arrays.asList(s.split(" ")).iterator();
+      }
+    }, Encoders.STRING());
+
+    // Generate running word count
+    Dataset<Row> wordCounts = words.groupBy("value").count();
+
+    // Start running the query that prints the running counts to the console
+    StreamingQuery query = wordCounts.writeStream()
+                            .outputMode("complete")
+                            .format("console")
+                            .start();
+
+    query.awaitTermination();
+  }
+}

http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/examples/src/main/scala/org/apache/bahir/examples/sql/streaming/akka/AkkaStreamWordCount.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/examples/src/main/scala/org/apache/bahir/examples/sql/streaming/akka/AkkaStreamWordCount.scala b/sql-streaming-akka/examples/src/main/scala/org/apache/bahir/examples/sql/streaming/akka/AkkaStreamWordCount.scala
new file mode 100644
index 0000000..8c4185a
--- /dev/null
+++ b/sql-streaming-akka/examples/src/main/scala/org/apache/bahir/examples/sql/streaming/akka/AkkaStreamWordCount.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bahir.examples.sql.streaming.akka
+
+import java.sql.Timestamp
+
+import org.apache.spark.sql.SparkSession
+
+/**
+ * Counts words in UTF8 encoded, '\n' delimited text received from Akka Feeder Actor system.
+ *
+ * Usage: AkkaStreamWordCount <urlOfPublisher>
+ * <urlOfPublisher> provides the uri of the publisher or feeder actor that Structured Streaming
+ * would connect to receive data.
+ *
+ * To run this on your local machine, a Feeder Actor System should be up and running.
+ *
+ */
+object AkkaStreamWordCount {
+  def main(args: Array[String]): Unit = {
+    if (args.length < 1) {
+      System.err.println("Usage: AkkaStreamWordCount <urlOfPublisher>") // scalastyle:off println
+      System.exit(1)
+    }
+
+    val urlOfPublisher = args(0)
+
+    val spark = SparkSession
+                .builder()
+                .appName("AkkaStreamWordCount")
+                .master("local[4]")
+                .getOrCreate()
+
+    import spark.implicits._
+
+    // Create DataFrame representing the stream of input lines from connection
+    // to publisher or feeder actor
+    val lines = spark.readStream
+                .format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider")
+                .option("urlOfPublisher", urlOfPublisher)
+                .load().as[(String, Timestamp)]
+
+    // Split the lines into words
+    val words = lines.map(_._1).flatMap(_.split(" "))
+
+    // Generate running word count
+    val wordCounts = words.groupBy("value").count()
+
+    // Start running the query that prints the running counts to the console
+    val query = wordCounts.writeStream
+                .outputMode("complete")
+                .format("console")
+                .start()
+
+    query.awaitTermination()
+  }
+}

http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/pom.xml
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/pom.xml b/sql-streaming-akka/pom.xml
new file mode 100644
index 0000000..4d7040b
--- /dev/null
+++ b/sql-streaming-akka/pom.xml
@@ -0,0 +1,120 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>bahir-parent_2.11</artifactId>
+        <version>2.2.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <groupId>org.apache.bahir</groupId>
+    <artifactId>spark-sql-streaming-akka_2.11</artifactId>
+    <properties>
+        <sbt.project.name>sql-streaming-akka</sbt.project.name>
+    </properties>
+    <packaging>jar</packaging>
+    <name>Apache Bahir - Spark SQL Streaming Akka</name>
+    <url>http://bahir.apache.org</url>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-tags_${scala.binary.version}</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${akka.group}</groupId>
+            <artifactId>akka-actor_${scala.binary.version}</artifactId>
+            <version>${akka.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${akka.group}</groupId>
+            <artifactId>akka-remote_${scala.binary.version}</artifactId>
+            <version>${akka.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${akka.group}</groupId>
+            <artifactId>akka-slf4j_${scala.binary.version}</artifactId>
+            <version>${akka.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.rocksdb</groupId>
+            <artifactId>rocksdbjni</artifactId>
+            <version>5.1.2</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+        <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-source-plugin</artifactId>
+            </plugin>
+
+            <!-- Assemble a jar with test dependencies for Python tests -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>test-jar-with-dependencies</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                        <configuration>
+                            <!-- Make sure the file path is same as the sbt build -->
+                            <finalName>spark-streaming-akka-test-${project.version}</finalName>
+                            <outputDirectory>${project.build.directory}/scala-${scala.binary.version}</outputDirectory>
+                            <appendAssemblyId>false</appendAssemblyId>
+                            <!-- Don't publish it since it's only for Python tests -->
+                            <attach>false</attach>
+                            <descriptors>
+                                <descriptor>src/main/assembly/assembly.xml</descriptor>
+                            </descriptors>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/src/main/assembly/assembly.xml
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/src/main/assembly/assembly.xml b/sql-streaming-akka/src/main/assembly/assembly.xml
new file mode 100644
index 0000000..58a95a0
--- /dev/null
+++ b/sql-streaming-akka/src/main/assembly/assembly.xml
@@ -0,0 +1,44 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<assembly>
+    <id>test-jar-with-dependencies</id>
+    <formats>
+        <format>jar</format>
+    </formats>
+    <includeBaseDirectory>false</includeBaseDirectory>
+
+    <fileSets>
+        <fileSet>
+            <directory>${project.build.directory}/scala-${scala.binary.version}/test-classes</directory>
+            <outputDirectory></outputDirectory>
+        </fileSet>
+    </fileSets>
+
+    <dependencySets>
+        <dependencySet>
+            <useTransitiveDependencies>true</useTransitiveDependencies>
+            <scope>test</scope>
+            <unpack>true</unpack>
+            <excludes>
+                <exclude>org.apache.hadoop:*:jar</exclude>
+                <exclude>org.apache.zookeeper:*:jar</exclude>
+                <exclude>org.apache.avro:*:jar</exclude>
+            </excludes>
+        </dependencySet>
+    </dependencySets>
+
+</assembly>

http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala b/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala
new file mode 100644
index 0000000..96d892f
--- /dev/null
+++ b/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bahir.sql.streaming.akka
+
+import java.nio.ByteBuffer
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util.{Calendar, Objects}
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration._
+import scala.concurrent.Future
+import scala.language.postfixOps
+import scala.util.{Failure, Success, Try}
+
+import akka.actor._
+import akka.actor.SupervisorStrategy.{Escalate, Restart}
+import akka.pattern.ask
+import akka.util.Timeout
+import com.typesafe.config.ConfigFactory
+import org.rocksdb.{Options, RocksDB}
+
+import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source}
+import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
+import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
+
+import org.apache.bahir.utils.Logging
+
+object AkkaStreamConstants {
+
+  val SCHEMA_DEFAULT = StructType(StructField("value", StringType)
+    :: StructField("timestamp", TimestampType) :: Nil)
+
+  val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
+
+  val defaultSupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
+    15 millis) {
+    case _: RuntimeException => Restart
+    case _: Exception => Escalate
+  }
+
+  val defaultActorSystemCreator: () => ActorSystem = () => {
+//    val uniqueSystemName = s"streaming-actor-system-${TaskContext.get().taskAttemptId()}"
+    val uniqueSystemName = s"streaming-actor-system"
+    val akkaConf = ConfigFactory.parseString(
+      s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider"
+         |akka.remote.enabled-transports = ["akka.remote.netty.tcp"]
+         |akka.remote.netty.tcp.port = "0"
+         |akka.loggers.0 = "akka.event.slf4j.Slf4jLogger"
+         |akka.log-dead-letters-during-shutdown = "off"
+       """.stripMargin)
+    ActorSystem(uniqueSystemName, akkaConf)
+  }
+}
+
+case class SubscribeReceiver(receiverActor: ActorRef)
+case class UnsubscribeReceiver(receiverActor: ActorRef)
+
+case class Statistics(numberOfMsgs: Int,
+                      numberOfWorkers: Int,
+                      numberOfHiccups: Int,
+                      otherInfo: String)
+
+private[akka] sealed trait ActorReceiverData
+private[akka] case class SingleItemData(item: String) extends ActorReceiverData
+private[akka] case class AskStoreSingleItemData(item: String) extends ActorReceiverData
+private[akka] case class IteratorData(iterator: Iterator[String]) extends ActorReceiverData
+private[akka] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData
+private[akka] object Ack extends ActorReceiverData
+
+class AkkaStreamSource(urlOfPublisher: String,
+                       persistence: RocksDB, sqlContext: SQLContext,
+                       messageParser: String => (String, Timestamp))
+  extends Source with Logging {
+
+  override def schema: StructType = AkkaStreamConstants.SCHEMA_DEFAULT
+
+  private val store = new LocalMessageStore(persistence, sqlContext.sparkContext.getConf)
+
+  private val messages = new TrieMap[Int, (String, Timestamp)]()
+
+  private val initLock = new CountDownLatch(1)
+
+  private var offset = 0
+
+  private var actorSystem: ActorSystem = _
+  private var actorSupervisor: ActorRef = _
+
+  private def fetchLastProcessedOffset(): Int = {
+    Try(store.maxProcessedOffset) match {
+      case Success(x) =>
+        log.info(s"Recovering from last stored offset $x")
+        x
+      case Failure(e) => 0
+    }
+  }
+
+  initialize()
+  private def initialize(): Unit = {
+
+    class ActorReceiver(urlOfPublisher: String) extends Actor {
+
+      lazy private val remotePublisher = context.actorSelection(urlOfPublisher)
+
+      override def preStart(): Unit = remotePublisher ! SubscribeReceiver(context.self)
+
+      override def receive: PartialFunction[Any, Unit] = {
+        case msg: String => store(msg)
+      }
+
+      override def postStop(): Unit = remotePublisher ! UnsubscribeReceiver(context.self)
+
+      def store(iter: Iterator[String]) = {
+        context.parent ! IteratorData(iter)
+      }
+
+      def store(item: String) = {
+        context.parent ! SingleItemData(item)
+      }
+
+      def store(item: String, timeout: Timeout): Future[Unit] = {
+        context.parent.ask(AskStoreSingleItemData(item))(timeout).map(_ => ())(context.dispatcher)
+      }
+    }
+
+    class Supervisor extends Actor {
+      override val supervisorStrategy = AkkaStreamConstants.defaultSupervisorStrategy
+
+      private val props = Props(new ActorReceiver(urlOfPublisher))
+      private val name = "ActorReceiver"
+      private val worker = context.actorOf(props, name)
+      log.info("Started receiver actor at:" + worker.path)
+
+      private val n: AtomicInteger = new AtomicInteger(0)
+      private val hiccups: AtomicInteger = new AtomicInteger(0)
+
+      override def receive: PartialFunction[Any, Unit] = {
+
+        case data =>
+          initLock.await()
+          var temp = offset + 1
+
+          data match {
+            case IteratorData(iterator) =>
+              log.debug("received iterator")
+              iterator.asInstanceOf[Iterator[String]].foreach(record => {
+                messages.put(temp, messageParser(record.toString))
+                temp += 1
+              })
+
+            case SingleItemData(msg) =>
+              log.debug("received single")
+              messages.put(temp, messageParser(msg))
+              n.incrementAndGet()
+
+            case AskStoreSingleItemData(msg) =>
+              log.debug("received single sync")
+              messages.put(temp, messageParser(msg))
+              n.incrementAndGet()
+              sender() ! Ack
+
+            case ByteBufferData(bytes) =>
+              log.debug("received bytes")
+              messages.put(temp, messageParser(new String(bytes.array())))
+
+            case props: Props =>
+              val worker = context.actorOf(props)
+              log.info("Started receiver worker at:" + worker.path)
+              sender() ! worker
+
+            case (props: Props, name: String) =>
+              val worker = context.actorOf(props, name)
+              log.info("Started receiver worker at:" + worker.path)
+              sender() ! worker
+
+            case _: PossiblyHarmful => hiccups.incrementAndGet()
+
+            case _: Statistics =>
+              val workers = context.children
+              sender() ! Statistics(n.get(), workers.size, hiccups.get(), workers.mkString("\n"))
+          }
+          offset = temp
+      }
+    }
+
+    actorSystem = AkkaStreamConstants.defaultActorSystemCreator()
+    actorSupervisor = actorSystem.actorOf(Props(new Supervisor), "Supervisor")
+    offset = fetchLastProcessedOffset()
+    initLock.countDown()
+  }
+
+  override def stop(): Unit = {
+    actorSupervisor ! PoisonPill
+    Persistence.close()
+    actorSystem.shutdown()
+    actorSystem.awaitTermination()
+  }
+
+  override def getOffset: Option[Offset] = {
+    if (offset == 0) {
+      None
+    } else {
+      Some(LongOffset(offset))
+    }
+  }
+
+  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+    val startIndex = start.getOrElse(LongOffset(0L)).asInstanceOf[LongOffset].offset.toInt
+    val endIndex = end.asInstanceOf[LongOffset].offset.toInt
+    val data: ArrayBuffer[(String, Timestamp)] = ArrayBuffer.empty
+
+    ((startIndex + 1) to endIndex).foreach { id =>
+      val element: (String, Timestamp) = messages.getOrElse(id,
+        store.retrieve[(String, Timestamp)](id).orNull)
+
+      if (!Objects.isNull(element)) {
+        data += element
+        store.store(id, element)
+      }
+      messages.remove(id, element)
+    }
+    log.trace(s"Get Batch invoked, ${data.mkString}")
+    import sqlContext.implicits._
+    data.toDF("value", "timestamp")
+  }
+}
+
+class AkkaStreamSourceProvider extends StreamSourceProvider with DataSourceRegister with Logging {
+
+  override def sourceSchema(sqlContext: SQLContext, schema: Option[StructType],
+                            providerName: String, parameters: Map[String, String])
+  : (String, StructType) = ("akka", AkkaStreamConstants.SCHEMA_DEFAULT)
+
+  override def createSource(sqlContext: SQLContext, metadataPath: String,
+                            schema: Option[StructType], providerName: String,
+                            parameters: Map[String, String]): Source = {
+
+    def e(s: String) = new IllegalArgumentException(s)
+
+    val urlOfPublisher: String = parameters.getOrElse("urlOfPublisher", parameters.getOrElse("path",
+      throw e(
+        s"""Please provide url of Publisher actor by specifying path
+           | or .options("urlOfPublisher",...)""".stripMargin)))
+
+    val persistenceDirPath: String = parameters.getOrElse("persistenceDirPath",
+      System.getProperty("java.io.tmpdir"))
+
+    val messageParserWithTimestamp = (x: String) =>
+      (x, Timestamp.valueOf(AkkaStreamConstants.DATE_FORMAT.format(Calendar.getInstance().getTime)))
+
+    val persistence = Persistence.getOrCreatePersistenceInstance(persistenceDirPath)
+    new AkkaStreamSource(urlOfPublisher, persistence, sqlContext, messageParserWithTimestamp)
+  }
+
+  override def shortName(): String = "akka"
+}
+
+object Persistence {
+  var persistence: RocksDB = _
+
+  def getOrCreatePersistenceInstance(persistenceDirPath: String): RocksDB = {
+    if (Objects.isNull(persistence)) {
+      RocksDB.loadLibrary()
+      persistence = RocksDB.open(new Options().setCreateIfMissing(true), persistenceDirPath)
+    }
+    persistence
+  }
+
+  def close(): Unit = {
+    if (!Objects.isNull(persistence)) {
+      persistence.close()
+      persistence = null
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/MessageStore.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/MessageStore.scala b/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/MessageStore.scala
new file mode 100644
index 0000000..9babd82
--- /dev/null
+++ b/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/MessageStore.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bahir.sql.streaming.akka
+
+import java.nio.ByteBuffer
+
+import scala.reflect.ClassTag
+
+import org.rocksdb.RocksDB
+
+import org.apache.spark.serializer.{JavaSerializer, Serializer, SerializerInstance}
+import org.apache.spark.SparkConf
+
+import org.apache.bahir.utils.Logging
+
+
+trait MessageStore {
+
+  def store[T: ClassTag](id: Int, message: T): Boolean
+
+  def retrieve[T: ClassTag](start: Int, end: Int): Seq[Option[T]]
+
+  def retrieve[T: ClassTag](id: Int): Option[T]
+
+  def maxProcessedOffset: Int
+}
+
+private[akka] class LocalMessageStore(val persistentStore: RocksDB,
+                                      val serializer: Serializer)
+  extends MessageStore with Logging {
+
+  val classLoader = Thread.currentThread().getContextClassLoader
+
+  def this(persistentStore: RocksDB, conf: SparkConf) =
+    this(persistentStore, new JavaSerializer(conf))
+
+  val serializerInstance: SerializerInstance = serializer.newInstance()
+
+  private def get(id: Int) = persistentStore.get(id.toString.getBytes)
+
+  override def maxProcessedOffset: Int = persistentStore.getLatestSequenceNumber.toInt
+
+  override def store[T: ClassTag](id: Int, message: T): Boolean = {
+    val bytes: Array[Byte] = serializerInstance.serialize(message).array()
+    try {
+      persistentStore.put(id.toString.getBytes(), bytes)
+      true
+    } catch {
+      case e: Exception => log.warn(s"Failed to store message Id: $id", e)
+        false
+    }
+  }
+
+  override def retrieve[T: ClassTag](start: Int, end: Int): Seq[Option[T]] = {
+    (start until end).map(x => retrieve(x))
+  }
+
+  override def retrieve[T: ClassTag](id: Int): Option[T] = {
+    val bytes = persistentStore.get(id.toString.getBytes)
+
+    if (bytes != null) {
+      Some(serializerInstance.deserialize(
+        ByteBuffer.wrap(bytes), classLoader))
+    } else {
+      None
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/BahirUtils.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/BahirUtils.scala b/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/BahirUtils.scala
new file mode 100644
index 0000000..996a0a1
--- /dev/null
+++ b/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/BahirUtils.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bahir.utils
+
+import java.io.{File, IOException}
+import java.nio.file.{Files, FileVisitResult, Path, SimpleFileVisitor}
+import java.nio.file.attribute.BasicFileAttributes
+
+object BahirUtils extends Logging {
+
+  def recursiveDeleteDir(dir: File): Path = {
+    Files.walkFileTree(dir.toPath, new SimpleFileVisitor[Path]() {
+      override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = {
+        try {
+          Files.delete(file)
+        } catch {
+          case t: Throwable => log.warn("Failed to delete", t)
+        }
+        FileVisitResult.CONTINUE
+      }
+
+      override def postVisitDirectory(dir: Path, exc: IOException): FileVisitResult = {
+        try {
+          Files.delete(dir)
+        } catch {
+          case t: Throwable => log.warn("Failed to delete", t)
+        }
+        FileVisitResult.CONTINUE
+      }
+    })
+  }
+}

http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/Logging.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/Logging.scala b/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/Logging.scala
new file mode 100644
index 0000000..776ed5a
--- /dev/null
+++ b/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/Logging.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bahir.utils
+
+import org.slf4j.LoggerFactory
+
+trait Logging {
+  final val log = LoggerFactory.getLogger(this.getClass.getName.stripSuffix("$"))
+}

http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/src/test/resources/feeder_actor.conf
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/src/test/resources/feeder_actor.conf b/sql-streaming-akka/src/test/resources/feeder_actor.conf
new file mode 100644
index 0000000..9ec210e
--- /dev/null
+++ b/sql-streaming-akka/src/test/resources/feeder_actor.conf
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+akka {
+  loglevel = "INFO"
+  actor {
+    provider = "akka.remote.RemoteActorRefProvider"
+  }
+  remote {
+    enabled-transports = ["akka.remote.netty.tcp"]
+    netty.tcp {
+      hostname = "127.0.0.1"
+      port = 0
+    }
+    log-sent-messages = on
+    log-received-messages = on
+  }
+  loggers.0 = "akka.event.slf4j.Slf4jLogger"
+  log-dead-letters-during-shutdown = "off"
+}

http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/src/test/resources/log4j.properties b/sql-streaming-akka/src/test/resources/log4j.properties
new file mode 100644
index 0000000..3706a6e
--- /dev/null
+++ b/sql-streaming-akka/src/test/resources/log4j.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=true
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.spark_project.jetty=WARN

http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala b/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala
new file mode 100644
index 0000000..a04dc66
--- /dev/null
+++ b/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bahir.sql.streaming.akka
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.sql.{DataFrame, SparkSession, SQLContext}
+import org.apache.spark.sql.execution.streaming.FileStreamSource.Timestamp
+import org.apache.spark.sql.execution.streaming.LongOffset
+
+import org.apache.bahir.utils.BahirUtils
+
+class AkkaStreamSourceSuite extends SparkFunSuite with BeforeAndAfter {
+
+  protected var akkaTestUtils: AkkaTestUtils = _
+  protected val tempDir: File =
+    new File(System.getProperty("java.io.tmpdir") + "/spark-akka-persistence")
+
+  private val conf = new SparkConf().setMaster("local[4]").setAppName("AkkaStreamSourceSuite")
+  protected val spark = SparkSession.builder().config(conf).getOrCreate()
+
+  akkaTestUtils = new AkkaTestUtils
+  akkaTestUtils.setup()
+
+  before {
+    tempDir.mkdirs()
+  }
+
+  after {
+    Persistence.close()
+    BahirUtils.recursiveDeleteDir(tempDir)
+  }
+
+  protected val tmpDir: String = tempDir.getAbsolutePath
+
+  protected def createStreamingDataframe(dir: String = tmpDir): (SQLContext, DataFrame) = {
+
+    val sqlContext: SQLContext = spark.sqlContext
+
+    sqlContext.setConf("spark.sql.streaming.checkpointLocation", dir + "/checkpoint")
+
+    val dataFrame: DataFrame =
+      sqlContext.readStream.format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider")
+        .option("urlOfPublisher", akkaTestUtils.getFeederActorUri())
+        .option("persistenceDirPath", dir + "/persistence").load()
+    (sqlContext, dataFrame)
+  }
+}
+
+class BasicAkkaSourceSuite extends AkkaStreamSourceSuite {
+
+  private def writeStreamResults(sqlContext: SQLContext, dataFrame: DataFrame,
+                                 waitDuration: Long): Boolean = {
+    import sqlContext.implicits._
+    dataFrame.as[(String, Timestamp)].writeStream.format("parquet")
+      .start(s"$tmpDir/parquet/t.parquet").awaitTermination(waitDuration)
+  }
+
+  private def readBackSreamingResults(sqlContext: SQLContext): mutable.Buffer[String] = {
+    import sqlContext.implicits._
+    val asList =
+      sqlContext.read.schema(AkkaStreamConstants.SCHEMA_DEFAULT)
+      .parquet(s"$tmpDir/parquet/t.parquet").as[(String, Timestamp)].map(_._1)
+      .collectAsList().asScala
+    asList
+  }
+
+  test("basic usage") {
+    val message = "Akka is a reactive framework"
+
+    akkaTestUtils.setMessage(message)
+    akkaTestUtils.setCountOfMessages(1)
+
+    val (sqlContext: SQLContext, dataFrame: DataFrame) = createStreamingDataframe()
+
+    writeStreamResults(sqlContext, dataFrame, 10000)
+
+    val resultBuffer: mutable.Buffer[String] = readBackSreamingResults(sqlContext)
+
+    assert(resultBuffer.size === 1)
+    assert(resultBuffer.head === message)
+  }
+
+  test("Send and receive 100 messages.") {
+    val message = "Akka is a reactive framework"
+
+    akkaTestUtils.setMessage(message)
+    akkaTestUtils.setCountOfMessages(100)
+
+    val (sqlContext: SQLContext, dataFrame: DataFrame) = createStreamingDataframe()
+
+    writeStreamResults(sqlContext, dataFrame, 10000)
+
+    val resultBuffer: mutable.Buffer[String] = readBackSreamingResults(sqlContext)
+
+    assert(resultBuffer.size === 100)
+    assert(resultBuffer.head === message)
+  }
+
+  test("params not provided") {
+    val persistenceDirPath = tempDir.getAbsolutePath + "/persistence"
+
+    val provider = new AkkaStreamSourceProvider
+    val sqlContext: SQLContext = spark.sqlContext
+
+    val parameters = Map("persistenceDirPath" -> persistenceDirPath)
+
+    intercept[IllegalArgumentException] {
+      provider.createSource(sqlContext, "", None, "", parameters)
+    }
+  }
+
+  test("Recovering offset from the last processed offset") {
+    val persistenceDirPath = tempDir.getAbsolutePath + "/persistence"
+    val message = "Akka is a reactive framework"
+
+    akkaTestUtils.setMessage(message)
+    akkaTestUtils.setCountOfMessages(100)
+
+    val (sqlContext: SQLContext, dataFrame: DataFrame) = createStreamingDataframe()
+
+    writeStreamResults(sqlContext, dataFrame, 10000)
+
+    val provider = new AkkaStreamSourceProvider
+    val parameters = Map("urlOfPublisher" -> akkaTestUtils.getFeederActorUri(),
+      "persistenceDirPath" -> persistenceDirPath)
+
+    val offset: Long = provider.createSource(sqlContext, "", None, "", parameters)
+      .getOffset.get.asInstanceOf[LongOffset].offset
+    assert(offset === 100L)
+  }
+}
+
+class StressTestAkkaSource extends AkkaStreamSourceSuite {
+
+  // Run with -Xmx1024m
+  // Default allowed payload size sent to an akka actor is 128000 bytes.
+  test("Send & Receive messages of size 128000 bytes.") {
+
+    val freeMemory: Long = Runtime.getRuntime.freeMemory()
+
+    log.info(s"Available memory before test run is ${freeMemory / (1024 * 1024)}MB.")
+
+    val noOfMsgs = 124 * 1024
+
+    val messageBuilder = new mutable.StringBuilder()
+    for (i <- 0 until noOfMsgs) yield messageBuilder.append(((i % 26) + 65).toChar)
+
+    val message = messageBuilder.toString()
+
+    akkaTestUtils.setMessage(message)
+    akkaTestUtils.setCountOfMessages(1)
+
+    val (sqlContext: SQLContext, dataFrame: DataFrame) = createStreamingDataframe()
+
+    import sqlContext.implicits._
+
+    dataFrame.as[(String, Timestamp)].writeStream
+      .format("parquet")
+      .start(s"$tmpDir/parquet/t.parquet")
+      .awaitTermination(25000)
+
+    val outputMessage =
+      sqlContext.read.schema(AkkaStreamConstants.SCHEMA_DEFAULT)
+        .parquet(s"$tmpDir/parquet/t.parquet").as[(String, Timestamp)]
+        .map(_._1).head()
+
+    assert(outputMessage === message)
+  }
+}

http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaTestUtils.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaTestUtils.scala b/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaTestUtils.scala
new file mode 100644
index 0000000..9cbfc32
--- /dev/null
+++ b/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaTestUtils.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.bahir.sql.streaming.akka
+
+import java.io.File
+
+import scala.collection.mutable
+import scala.util.Random
+
+import akka.actor.{Actor, ActorRef, ActorSystem, ExtendedActorSystem, Props}
+import com.typesafe.config.{Config, ConfigFactory}
+
+import org.apache.bahir.utils.Logging
+
+class AkkaTestUtils extends Logging {
+  private val actorSystemName = "feeder-actor-system"
+  private var actorSystem: ActorSystem = _
+
+  private val feederActorName = "feederActor"
+
+  private var message: String = _
+  private var count = 1
+
+  def getFeederActorConfig(): Config = {
+    val configFile = getClass.getClassLoader
+                      .getResource("feeder_actor.conf").getFile
+    ConfigFactory.parseFile(new File(configFile))
+  }
+
+  def getFeederActorUri(): String =
+    s"${actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress}" +
+      s"/user/$feederActorName"
+
+  class FeederActor extends Actor {
+
+    val rand = new Random()
+    val receivers = new mutable.LinkedHashSet[ActorRef]()
+
+    val sendMessageThread =
+      new Thread() {
+        override def run(): Unit = {
+          var counter = 0
+          while (counter < count) {
+//            Thread.sleep(500)
+            receivers.foreach(_ ! message)
+            counter += 1
+          }
+        }
+      }
+
+    override def receive: Receive = {
+      case SubscribeReceiver(receiverActor: ActorRef) =>
+        log.debug(s"received subscribe from ${receiverActor.toString}")
+        receivers += receiverActor
+        sendMessageThread.run()
+
+      case UnsubscribeReceiver(receiverActor: ActorRef) =>
+        log.debug(s"received unsubscribe from ${receiverActor.toString}")
+        receivers -= receiverActor
+    }
+  }
+
+  def setup(): Unit = {
+    val feederConf = getFeederActorConfig()
+
+    actorSystem = ActorSystem(actorSystemName, feederConf)
+    actorSystem.actorOf(Props(new FeederActor), feederActorName)
+  }
+
+  def shutdown(): Unit = {
+//    actorSystem.awaitTermination()
+    actorSystem.shutdown()
+  }
+
+  def setMessage(message: String): Unit = this.message = message
+  def setCountOfMessages(messageCount: Int): Unit = count = messageCount
+}