You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2016/11/17 12:05:27 UTC
bahir git commit: [BAHIR-64] add Akka streaming test (send/receive)
Repository: bahir
Updated Branches:
refs/heads/master 50ecf2058 -> d43dad219
[BAHIR-64] add Akka streaming test (send/receive)
This PR adds the test suite AkkaStreamSuite.scala to
the streaming connector streaming-akka to test data
being sent and received.
Closes #24
Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/d43dad21
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/d43dad21
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/d43dad21
Branch: refs/heads/master
Commit: d43dad21963d2ba338acc44d6233ff020cef7d38
Parents: 50ecf20
Author: Christian Kadner <ck...@us.ibm.com>
Authored: Wed Sep 28 12:41:35 2016 -0700
Committer: Luciano Resende <lr...@apache.org>
Committed: Thu Nov 17 13:05:16 2016 +0100
----------------------------------------------------------------------
NOTICE | 2 +-
.../spark/streaming/akka/ActorReceiver.scala | 1 +
.../spark/streaming/akka/AkkaStreamSuite.scala | 106 +++++++++++++++++++
3 files changed, 108 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bahir/blob/d43dad21/NOTICE
----------------------------------------------------------------------
diff --git a/NOTICE b/NOTICE
index 8bf7751..7067f1e 100644
--- a/NOTICE
+++ b/NOTICE
@@ -2,4 +2,4 @@ Apache Bahir
Copyright (c) 2016 The Apache Software Foundation.
This product includes software developed at
-The Apache Software Foundation (http://www.apache.org/).
\ No newline at end of file
+The Apache Software Foundation (http://www.apache.org/).
http://git-wip-us.apache.org/repos/asf/bahir/blob/d43dad21/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
----------------------------------------------------------------------
diff --git a/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala b/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
index e3be880..d30e380 100644
--- a/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
+++ b/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
@@ -65,6 +65,7 @@ object ActorReceiver {
val akkaConf = ConfigFactory.parseString(
s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.enabled-transports = ["akka.remote.netty.tcp"]
+ |akka.remote.netty.tcp.port = "0"
|""".stripMargin)
ActorSystem(uniqueSystemName, akkaConf)
}
http://git-wip-us.apache.org/repos/asf/bahir/blob/d43dad21/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala
----------------------------------------------------------------------
diff --git a/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala b/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala
new file mode 100644
index 0000000..e52bf0e
--- /dev/null
+++ b/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.akka
+
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration._
+
+import akka.actor._
+import com.typesafe.config.ConfigFactory
+import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.Eventually
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
+
+class AkkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter {
+
+ private var ssc: StreamingContext = _
+
+ private var actorSystem: ActorSystem = _
+
+ after {
+ if (ssc != null) {
+ ssc.stop()
+ ssc = null
+ }
+ if (actorSystem != null) {
+ actorSystem.shutdown()
+ actorSystem.awaitTermination(30.seconds)
+ actorSystem = null
+ }
+ }
+
+ test("actor input stream") {
+ val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
+ ssc = new StreamingContext(sparkConf, Milliseconds(500))
+
+ // we set the TCP port to "0" to have the port chosen automatically for the Feeder actor and
+ // the Receiver actor will "pick it up" from the Feeder URI when it subscribes to the Feeder
+ // actor (http://doc.akka.io/docs/akka/2.3.11/scala/remoting.html)
+ val akkaConf = ConfigFactory.parseMap(
+ Map(
+ "akka.actor.provider" -> "akka.remote.RemoteActorRefProvider",
+ "akka.remote.netty.tcp.transport-class" -> "akka.remote.transport.netty.NettyTransport",
+ "akka.remote.netty.tcp.port" -> "0").
+ asJava)
+ actorSystem = ActorSystem("test", akkaConf)
+ actorSystem.actorOf(Props(classOf[FeederActor]), "FeederActor")
+ val feederUri =
+ actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress + "/user/FeederActor"
+
+ val actorStream =
+ AkkaUtils.createStream[String](ssc, Props(classOf[TestActorReceiver], feederUri),
+ "TestActorReceiver")
+ val result = new ConcurrentLinkedQueue[String]
+ actorStream.foreachRDD { rdd =>
+ rdd.collect().foreach(result.add)
+ }
+ ssc.start()
+
+ eventually(timeout(10.seconds), interval(10.milliseconds)) {
+ assert((1 to 10).map(_.toString) === result.asScala.toList)
+ }
+ }
+}
+
+case class SubscribeReceiver(receiverActor: ActorRef)
+
+class FeederActor extends Actor {
+
+ def receive: Receive = {
+ case SubscribeReceiver(receiverActor: ActorRef) =>
+ (1 to 10).foreach(i => receiverActor ! i.toString())
+ }
+}
+
+class TestActorReceiver(uriOfPublisher: String) extends ActorReceiver {
+
+ lazy private val remotePublisher = context.actorSelection(uriOfPublisher)
+
+ override def preStart(): Unit = {
+ remotePublisher ! SubscribeReceiver(self)
+ }
+
+ def receive: PartialFunction[Any, Unit] = {
+ case msg: String => store(msg)
+ }
+
+}