You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2016/11/17 12:05:27 UTC

bahir git commit: [BAHIR-64] add Akka streaming test (send/receive)

Repository: bahir
Updated Branches:
  refs/heads/master 50ecf2058 -> d43dad219


[BAHIR-64] add Akka streaming test (send/receive)

This PR adds the test suite AkkaStreamSuite.scala to
the streaming connector streaming-akka to test data
being sent and received.

Closes #24


Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/d43dad21
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/d43dad21
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/d43dad21

Branch: refs/heads/master
Commit: d43dad21963d2ba338acc44d6233ff020cef7d38
Parents: 50ecf20
Author: Christian Kadner <ck...@us.ibm.com>
Authored: Wed Sep 28 12:41:35 2016 -0700
Committer: Luciano Resende <lr...@apache.org>
Committed: Thu Nov 17 13:05:16 2016 +0100

----------------------------------------------------------------------
 NOTICE                                          |   2 +-
 .../spark/streaming/akka/ActorReceiver.scala    |   1 +
 .../spark/streaming/akka/AkkaStreamSuite.scala  | 106 +++++++++++++++++++
 3 files changed, 108 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir/blob/d43dad21/NOTICE
----------------------------------------------------------------------
diff --git a/NOTICE b/NOTICE
index 8bf7751..7067f1e 100644
--- a/NOTICE
+++ b/NOTICE
@@ -2,4 +2,4 @@ Apache Bahir
 Copyright (c) 2016 The Apache Software Foundation.
 
 This product includes software developed at
-The Apache Software Foundation (http://www.apache.org/).
\ No newline at end of file
+The Apache Software Foundation (http://www.apache.org/).

http://git-wip-us.apache.org/repos/asf/bahir/blob/d43dad21/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
----------------------------------------------------------------------
diff --git a/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala b/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
index e3be880..d30e380 100644
--- a/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
+++ b/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
@@ -65,6 +65,7 @@ object ActorReceiver {
     val akkaConf = ConfigFactory.parseString(
       s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider"
          |akka.remote.enabled-transports = ["akka.remote.netty.tcp"]
+         |akka.remote.netty.tcp.port = "0"
          |""".stripMargin)
     ActorSystem(uniqueSystemName, akkaConf)
   }

http://git-wip-us.apache.org/repos/asf/bahir/blob/d43dad21/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala
----------------------------------------------------------------------
diff --git a/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala b/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala
new file mode 100644
index 0000000..e52bf0e
--- /dev/null
+++ b/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.akka
+
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration._
+
+import akka.actor._
+import com.typesafe.config.ConfigFactory
+import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.Eventually
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
+
+class AkkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter {
+
+  private var ssc: StreamingContext = _
+
+  private var actorSystem: ActorSystem = _
+
+  after {
+    if (ssc != null) {
+      ssc.stop()
+      ssc = null
+    }
+    if (actorSystem != null) {
+      actorSystem.shutdown()
+      actorSystem.awaitTermination(30.seconds)
+      actorSystem = null
+    }
+  }
+
+  test("actor input stream") {
+    val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
+    ssc = new StreamingContext(sparkConf, Milliseconds(500))
+
+    // we set the TCP port to "0" to have the port chosen automatically for the Feeder actor and
+    // the Receiver actor will "pick it up" from the Feeder URI when it subscribes to the Feeder
+    // actor (http://doc.akka.io/docs/akka/2.3.11/scala/remoting.html)
+    val akkaConf = ConfigFactory.parseMap(
+      Map(
+        "akka.actor.provider" -> "akka.remote.RemoteActorRefProvider",
+        "akka.remote.netty.tcp.transport-class" -> "akka.remote.transport.netty.NettyTransport",
+        "akka.remote.netty.tcp.port" -> "0").
+        asJava)
+    actorSystem = ActorSystem("test", akkaConf)
+    actorSystem.actorOf(Props(classOf[FeederActor]), "FeederActor")
+    val feederUri =
+      actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress + "/user/FeederActor"
+
+    val actorStream =
+      AkkaUtils.createStream[String](ssc, Props(classOf[TestActorReceiver], feederUri),
+        "TestActorReceiver")
+    val result = new ConcurrentLinkedQueue[String]
+    actorStream.foreachRDD { rdd =>
+      rdd.collect().foreach(result.add)
+    }
+    ssc.start()
+
+    eventually(timeout(10.seconds), interval(10.milliseconds)) {
+      assert((1 to 10).map(_.toString) === result.asScala.toList)
+    }
+  }
+}
+
+case class SubscribeReceiver(receiverActor: ActorRef)
+
+class FeederActor extends Actor {
+
+  def receive: Receive = {
+    case SubscribeReceiver(receiverActor: ActorRef) =>
+      (1 to 10).foreach(i => receiverActor ! i.toString())
+  }
+}
+
+class TestActorReceiver(uriOfPublisher: String) extends ActorReceiver {
+
+  lazy private val remotePublisher = context.actorSelection(uriOfPublisher)
+
+  override def preStart(): Unit = {
+    remotePublisher ! SubscribeReceiver(self)
+  }
+
+  def receive: PartialFunction[Any, Unit] = {
+    case msg: String => store(msg)
+  }
+
+}