You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/01/20 12:43:43 UTC
[3/5] incubator-gearpump git commit: [GERAPUMP-22] Merge akka-streams
branch into master
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test10.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test10.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test10.scala
new file mode 100644
index 0000000..826cdcf
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test10.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.NotUsed
+import akka.stream.{ClosedShape, ThrottleMode}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+/**
+ * Stream example showing Conflate, Throttle
+ */
+object Test10 extends AkkaApp with ArgumentsParser {
+
+ // scalastyle:off println
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ import akka.actor.ActorSystem
+ import akka.stream.scaladsl._
+
+ implicit val system = ActorSystem("Test10", akkaConfig)
+ implicit val materializer = GearpumpMaterializer()
+ implicit val ec = system.dispatcher
+
+ // Conflate[A] - (2 inputs, 1 output) concatenates two streams
+ // (first consumes one, then the second one)
+ def stream(x: String) = Stream.continually(x)
+
+ val sourceA = Source(stream("A"))
+ val sourceB = Source(stream("B"))
+
+ val throttler: Flow[String, String, NotUsed] =
+ Flow[String].throttle(1, 1.second, 1, ThrottleMode.Shaping)
+ val conflateFlow: Flow[String, String, NotUsed] =
+ Flow[String].conflate((x: String, y: String) => x: String)
+ ((acc: String, x: String) => s"$acc::$x")
+
+ val printFlow: Flow[(String, String), String, NotUsed] =
+ Flow[(String, String)].map {
+ x =>
+ println(s" lengths are : ${x._1.length} and ${x._2.length} ; ${x._1} zip ${x._2}")
+ x.toString
+ }
+
+ val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
+ import GraphDSL.Implicits._
+
+ val zipping = b.add(Zip[String, String]())
+
+ sourceA ~> throttler ~> zipping.in0
+ sourceB ~> conflateFlow ~> zipping.in1
+
+ zipping.out ~> printFlow ~> Sink.ignore
+
+ ClosedShape
+ })
+
+ graph.run()
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+ // scalastyle:on println
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test11.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test11.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test11.scala
new file mode 100644
index 0000000..087c57d
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test11.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.NotUsed
+import akka.stream.ClosedShape
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+/**
+ * Stream example showing Broadcast and Merge
+ */
+object Test11 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ import akka.actor.ActorSystem
+ import akka.stream.scaladsl._
+
+ implicit val system = ActorSystem("Test11", akkaConfig)
+ implicit val materializer = GearpumpMaterializer()
+ // implicit val materializer =
+ // ActorMaterializer(ActorMaterializerSettings(system).withAutoFusing(false))
+ implicit val ec = system.dispatcher
+
+ val g = RunnableGraph.fromGraph(GraphDSL.create() {
+ implicit builder: GraphDSL.Builder[NotUsed] =>
+
+ import GraphDSL.Implicits._
+ val in = Source(1 to 10)
+ val output: (Any) => Unit = any => {
+ val s = s"**** $any"
+ println(s)
+ }
+ val out = Sink.foreach(output)
+
+ val broadcast = builder.add(Broadcast[Int](2))
+ val merge = builder.add(Merge[Int](2))
+
+ val f1, f2, f3, f4 = Flow[Int].map(_ + 10)
+
+ in ~> f1 ~> broadcast ~> f2 ~> merge ~> f3 ~> out
+ broadcast ~> f4 ~> merge
+
+ ClosedShape
+ })
+
+ g.run()
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+ // scalastyle:on println
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test12.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test12.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test12.scala
new file mode 100644
index 0000000..b4f4bce
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test12.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.stream.{ClosedShape, UniformFanInShape}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.{Await, Future}
+
+/**
+ * Partial source, sink example
+ */
+object Test12 extends AkkaApp with ArgumentsParser{
+ // scalastyle:off println
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ import akka.actor.ActorSystem
+ import akka.stream.scaladsl._
+
+ import scala.concurrent.duration._
+
+ implicit val system = ActorSystem("Test12", akkaConfig)
+ // implicit val materializer = ActorMaterializer(
+ // ActorMaterializerSettings(system).withAutoFusing(false)
+ // )
+ implicit val materializer = GearpumpMaterializer()
+ implicit val ec = system.dispatcher
+
+ val pickMaxOfThree = GraphDSL.create() { implicit b =>
+ import GraphDSL.Implicits._
+
+ val zip1 = b.add(ZipWith[Int, Int, Int](math.max))
+ val zip2 = b.add(ZipWith[Int, Int, Int](math.max))
+
+ zip1.out ~> zip2.in0
+
+ UniformFanInShape(zip2.out, zip1.in0, zip1.in1, zip2.in1)
+ }
+
+ val resultSink = Sink.head[Int]
+
+ val g = RunnableGraph.fromGraph(GraphDSL.create(resultSink) { implicit b =>
+ sink =>
+ import GraphDSL.Implicits._
+
+ // Importing the partial shape will return its shape (inlets & outlets)
+ val pm3 = b.add(pickMaxOfThree)
+
+ Source.single(1) ~> pm3.in(0)
+ Source.single(2) ~> pm3.in(1)
+ Source.single(3) ~> pm3.in(2)
+
+ pm3.out ~> sink.in
+
+ ClosedShape
+ })
+
+ val max: Future[Int] = g.run()
+ max.map(x => println(s"maximum of three numbers : $x"))
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+ // scalastyle:on println
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test13.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test13.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test13.scala
new file mode 100644
index 0000000..2e036cb
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test13.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import java.time._
+
+import akka.actor.ActorSystem
+import akka.stream.scaladsl.Source
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.scaladsl.Implicits._
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.collection.mutable
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import scala.util.Random
+
+/**
+ * GroupBy example
+ */
+
+/*
+// Original example
+val f = Source
+ .tick(0.seconds, 1.second, "")
+ .map { _ =>
+ val now = System.currentTimeMillis()
+ val delay = random.nextInt(8)
+ MyEvent(now - delay * 1000L)
+ }
+ .statefulMapConcat { () =>
+ val generator = new CommandGenerator()
+ ev => generator.forEvent(ev)
+ }
+ .groupBy(64, command => command.w)
+ .takeWhile(!_.isInstanceOf[CloseWindow])
+ .fold(AggregateEventData((0L, 0L), 0))({
+ case (agg, OpenWindow(window)) => agg.copy(w = window)
+ // always filtered out by takeWhile
+ case (agg, CloseWindow(_)) => agg
+ case (agg, AddToWindow(ev, _)) => agg.copy(eventCount = agg.eventCount + 1)
+ })
+ .async
+ .mergeSubstreams
+ .runForeach { agg =>
+ println(agg.toString)
+ }
+ */
+object Test13 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+
+ implicit val system = ActorSystem("Test13", akkaConfig)
+ implicit val materializer = GearpumpMaterializer()
+
+ val random = new Random()
+
+ val result = Source
+ .tick(0.seconds, 1.second, "tick data")
+ .map { _ =>
+ val now = System.currentTimeMillis()
+ val delay = random.nextInt(8)
+ MyEvent(now - delay * 1000L)
+ }
+ .statefulMapConcat { () =>
+ val generator = new CommandGenerator()
+ ev => generator.forEvent(ev)
+ }
+ .groupBy2(command => command.w)
+ .takeWhile(!_.isInstanceOf[CloseWindow])
+ .fold(AggregateEventData((0L, 0L), 0))({
+ case (agg, OpenWindow(window)) => agg.copy(w = window)
+ // always filtered out by takeWhile
+ case (agg, CloseWindow(_)) => agg
+ case (agg, AddToWindow(ev, _)) => agg.copy(eventCount = agg.eventCount + 1)
+ })
+ .runForeach(agg =>
+ println(agg.toString)
+ )
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+
+ case class MyEvent(timestamp: Long)
+
+ type Window = (Long, Long)
+
+ object Window {
+ val WindowLength = 10.seconds.toMillis
+ val WindowStep = 1.second.toMillis
+ val WindowsPerEvent = (WindowLength / WindowStep).toInt
+
+ def windowsFor(ts: Long): Set[Window] = {
+ val firstWindowStart = ts - ts % WindowStep - WindowLength + WindowStep
+ (for (i <- 0 until WindowsPerEvent) yield
+ (firstWindowStart + i * WindowStep,
+ firstWindowStart + i * WindowStep + WindowLength)
+ ).toSet
+ }
+ }
+
+ sealed trait WindowCommand {
+ def w: Window
+ }
+
+ case class OpenWindow(w: Window) extends WindowCommand
+
+ case class CloseWindow(w: Window) extends WindowCommand
+
+ case class AddToWindow(ev: MyEvent, w: Window) extends WindowCommand
+
+ class CommandGenerator {
+ private val MaxDelay = 5.seconds.toMillis
+ private var watermark = 0L
+ private val openWindows = mutable.Set[Window]()
+
+ def forEvent(ev: MyEvent): List[WindowCommand] = {
+ watermark = math.max(watermark, ev.timestamp - MaxDelay)
+ if (ev.timestamp < watermark) {
+ println(s"Dropping event with timestamp: ${tsToString(ev.timestamp)}")
+ Nil
+ } else {
+ val eventWindows = Window.windowsFor(ev.timestamp)
+
+ val closeCommands = openWindows.flatMap { ow =>
+ if (!eventWindows.contains(ow) && ow._2 < watermark) {
+ openWindows.remove(ow)
+ Some(CloseWindow(ow))
+ } else None
+ }
+
+ val openCommands = eventWindows.flatMap { w =>
+ if (!openWindows.contains(w)) {
+ openWindows.add(w)
+ Some(OpenWindow(w))
+ } else None
+ }
+
+ val addCommands = eventWindows.map(w => AddToWindow(ev, w))
+
+ openCommands.toList ++ closeCommands.toList ++ addCommands.toList
+ }
+ }
+ }
+
+ case class AggregateEventData(w: Window, eventCount: Int) {
+ override def toString: String =
+ s"Between ${tsToString(w._1)} and ${tsToString(w._2)}, there were $eventCount events."
+ }
+
+ def tsToString(ts: Long): String = OffsetDateTime
+ .ofInstant(Instant.ofEpochMilli(ts), ZoneId.systemDefault())
+ .toLocalTime
+ .toString
+ // scalastyle:on println
+
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test14.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test14.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test14.scala
new file mode 100644
index 0000000..c436130
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test14.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import java.io.File
+
+import akka.NotUsed
+import akka.actor.ActorSystem
+import akka.stream._
+import akka.stream.scaladsl._
+import akka.util.ByteString
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent._
+import scala.concurrent.duration._
+
+object Test14 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ implicit val system = ActorSystem("Test14", akkaConf)
+ implicit val materializer = GearpumpMaterializer()
+
+ def lineSink(filename: String): Sink[String, Future[IOResult]] = {
+ Flow[String]
+ .alsoTo(Sink.foreach(s => println(s"$filename: $s")))
+ .map(s => ByteString(s + "\n"))
+ .toMat(FileIO.toPath(new File(filename).toPath))(Keep.right)
+ }
+
+ val source: Source[Int, NotUsed] = Source(1 to 100)
+ val factorials: Source[BigInt, NotUsed] = source.scan(BigInt(1))((acc, next) => acc * next)
+ val sink1 = lineSink("factorial1.txt")
+ val sink2 = lineSink("factorial2.txt")
+ val slowSink2 = Flow[String].via(
+ Flow[String].throttle(1, 1.second, 1, ThrottleMode.shaping)
+ ).toMat(sink2)(Keep.right)
+ val bufferedSink2 = Flow[String].buffer(50, OverflowStrategy.backpressure).via(
+ Flow[String].throttle(1, 1.second, 1, ThrottleMode.shaping)
+ ).toMat(sink2)(Keep.right)
+
+ val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
+ import GraphDSL.Implicits._
+ val bcast = b.add(Broadcast[String](2))
+ factorials.map(_.toString) ~> bcast.in
+ bcast.out(0) ~> sink1
+ bcast.out(1) ~> bufferedSink2
+ ClosedShape
+ })
+
+ g.run()
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+ // scalastyle:on println
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test15.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test15.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test15.scala
new file mode 100644
index 0000000..f4e4dbd
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test15.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.ActorSystem
+import akka.stream._
+import akka.stream.scaladsl.{Balance, Broadcast, Flow, GraphDSL, Merge, RunnableGraph, Sink, Source}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+
+object Test15 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
+ )
+
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ val config = parse(args)
+ implicit val system = ActorSystem("Test15", akkaConf)
+ implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
+ case true =>
+ GearpumpMaterializer()
+ case false =>
+ ActorMaterializer(
+ ActorMaterializerSettings(system).withAutoFusing(false)
+ )
+ }
+ import akka.stream.scaladsl.GraphDSL.Implicits._
+ RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
+ val A = builder.add(Source.single(0)).out
+ val B = builder.add(Broadcast[Int](2))
+ val C = builder.add(Merge[Int](2).named("C"))
+ val D = builder.add(Flow[Int].map(_ + 1).named("D"))
+ val E = builder.add(Balance[Int](2).named("E"))
+ val F = builder.add(Merge[Int](2).named("F"))
+ val G = builder.add(Sink.foreach(println).named("G")).in
+
+ C <~ F
+ A ~> B ~> C ~> F
+ B ~> D ~> E ~> F
+ E ~> G
+
+ ClosedShape
+ }).run()
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+ // scalastyle:on println
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test16.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test16.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test16.scala
new file mode 100644
index 0000000..9691496
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test16.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.scaladsl.{GearSink, GearSource}
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.streaming.dsl.scalaapi.{CollectionDataSource, LoggerSink}
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+/**
+ * All remote
+ */
+object Test16 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ implicit val system = ActorSystem("Test16", akkaConf)
+ implicit val materializer = GearpumpMaterializer()
+
+ val sink = GearSink.to(new LoggerSink[String])
+ val sourceData = new CollectionDataSource(
+ List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky"))
+ val source = GearSource.from[String](sourceData)
+ source.filter(_.startsWith("red")).map("I want to order item: " + _).runWith(sink)
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+ // scalastyle:on println
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test2.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test2.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test2.scala
new file mode 100644
index 0000000..a6049cd
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test2.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.stream.scaladsl._
+import akka.stream.{ActorMaterializer, ClosedShape}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.scaladsl.{GearSink, GearSource}
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+/**
+ *
+ * This tests how different Materializers can be used together in an explicit way.
+ *
+ */
+object Test2 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ val config = parse(args)
+ implicit val system = ActorSystem("Test2", akkaConf)
+ val gearpumpMaterializer = GearpumpMaterializer()
+
+ val echo = system.actorOf(Props(new Echo()))
+ val source = GearSource.bridge[String, String]
+ val sink = GearSink.bridge[String, String]
+
+ val flow = Flow[String].filter(_.startsWith("red")).map("I want to order item: " + _)
+ val (entry, exit) = flow.runWith(source, sink)(gearpumpMaterializer)
+
+ val actorMaterializer = ActorMaterializer()
+
+ val externalSource = Source(
+ List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky")
+ )
+ val externalSink = Sink.actorRef(echo, "COMPLETE")
+
+ RunnableGraph.fromGraph(
+ GraphDSL.create() { implicit b =>
+ import GraphDSL.Implicits._
+ externalSource ~> Sink.fromSubscriber(entry)
+ Source.fromPublisher(exit) ~> externalSink
+ ClosedShape
+ }
+ ).run()(actorMaterializer)
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+
+ class Echo extends Actor {
+ def receive: Receive = {
+ case any: AnyRef =>
+ println("Confirm received: " + any)
+ }
+ }
+ // scalastyle:on println
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test3.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test3.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test3.scala
new file mode 100644
index 0000000..24faeb3
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test3.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.scaladsl.GearSource
+import akka.stream.scaladsl.Sink
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+/**
+ * read from remote and write to local
+ */
+object Test3 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
+ )
+
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ val config = parse(args)
+ implicit val system = ActorSystem("Test3", akkaConf)
+ implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
+ case true =>
+ GearpumpMaterializer()
+ case false =>
+ ActorMaterializer(
+ ActorMaterializerSettings(system).withAutoFusing(false)
+ )
+ }
+ val echo = system.actorOf(Props(new Echo()))
+ val sink = Sink.actorRef(echo, "COMPLETE")
+ val sourceData = new CollectionDataSource(
+ List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky"))
+ val source = GearSource.from[String](sourceData)
+ source.filter(_.startsWith("red")).map("I want to order item: " + _).runWith(sink)
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+
+ class Echo extends Actor {
+ def receive: Receive = {
+ case any: AnyRef =>
+ println("Confirm received: " + any)
+ }
+ }
+ // scalastyle:on println
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test4.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test4.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test4.scala
new file mode 100644
index 0000000..6a44a35
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test4.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.ActorSystem
+import akka.stream.scaladsl.Source
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.scaladsl.GearSink
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.streaming.dsl.scalaapi.LoggerSink
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+/**
+ * read from local and write to remote
+ */
+object Test4 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ implicit val system = ActorSystem("Test4", akkaConf)
+ implicit val materializer = GearpumpMaterializer()
+
+ Source(
+ List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky")
+ ).filter(_.startsWith("red")).
+ map("I want to order item: " + _).
+ runWith(GearSink.to(new LoggerSink[String]))
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+ // scalastyle:on println
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test5.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test5.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test5.scala
new file mode 100644
index 0000000..ad87a97
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test5.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.stream.ClosedShape
+import akka.stream.scaladsl._
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+/**
+ * test fanout
+ */
+object Test5 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ implicit val system = ActorSystem("Test5", akkaConf)
+ implicit val materializer = GearpumpMaterializer()
+
+ val echo = system.actorOf(Props(new Echo()))
+ val source = Source(List(("male", "24"), ("female", "23")))
+ val sink = Sink.actorRef(echo, "COMPLETE")
+
+ RunnableGraph.fromGraph(
+ GraphDSL.create() { implicit b =>
+ import GraphDSL.Implicits._
+ val unzip = b.add(Unzip[String, String]())
+ val sink1 = Sink.actorRef(echo, "COMPLETE")
+ val sink2 = Sink.actorRef(echo, "COMPLETE")
+ source ~> unzip.in
+ unzip.out0 ~> sink1
+ unzip.out1 ~> sink1
+ ClosedShape
+ }
+ ).run()
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+
+ class Echo extends Actor {
+ def receive: Receive = {
+ case any: AnyRef =>
+ println("Confirm received: " + any)
+ }
+ }
+ // scalastyle:on println
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test6.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test6.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test6.scala
new file mode 100644
index 0000000..a525471
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test6.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.stream.scaladsl.Sink
+import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.scaladsl.GearSource
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+
+/**
+ * WordCount example
+ * Test GroupBy2 (groupBy which uses SubFlow is not implemented yet)
+ */
+
+import org.apache.gearpump.akkastream.scaladsl.Implicits._
+
+object Test6 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
+ )
+
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ val config = parse(args)
+ implicit val system = ActorSystem("Test6", akkaConf)
+ implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
+ case true =>
+ GearpumpMaterializer()
+ case false =>
+ ActorMaterializer(
+ ActorMaterializerSettings(system).withAutoFusing(false)
+ )
+ }
+ val echo = system.actorOf(Props(Echo()))
+ val sink = Sink.actorRef(echo, "COMPLETE")
+ val sourceData = new CollectionDataSource(
+ List(
+ "this is a good start",
+ "this is a good time",
+ "time to start",
+ "congratulations",
+ "green plant",
+ "blue sky")
+ )
+ val source = GearSource.from[String](sourceData)
+ source.mapConcat({line =>
+ line.split(" ").toList
+ }).groupBy2(x => x)
+ .map(word => (word, 1))
+ .reduce({(a, b) =>
+ (a._1, a._2 + b._2)
+ })
+ .log("word-count")
+ .runWith(sink)
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+
+ case class Echo() extends Actor {
+ def receive: Receive = {
+ case any: AnyRef =>
+ println("Confirm received: " + any)
+ }
+ }
+ // scalastyle:on println
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test7.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test7.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test7.scala
new file mode 100644
index 0000000..8c837af
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test7.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.ActorSystem
+import akka.stream.scaladsl.{Broadcast, Merge, Sink, Source}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+
+/**
+ * This is a simplified API you can use to combine sources and sinks
+ * with junctions like: Broadcast[T], Balance[T], Merge[In] and Concat[A]
+ * without the need for using the Graph DSL
+ */
+
+object Test7 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ implicit val system = ActorSystem("Test7", akkaConf)
+ implicit val materializer = GearpumpMaterializer()
+ implicit val ec = system.dispatcher
+
+ val sourceA = Source(List(1))
+ val sourceB = Source(List(2))
+ val mergedSource = Source.combine(sourceA, sourceB)(Merge(_))
+
+ val sinkA = Sink.foreach[Int](x => println(s"In SinkA : $x"))
+ val sinkB = Sink.foreach[Int](x => println(s"In SinkB : $x"))
+ val sink = Sink.combine(sinkA, sinkB)(Broadcast[Int](_))
+ mergedSource.runWith(sink)
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+ // scalastyle:on println
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala
new file mode 100644
index 0000000..ad2ac61
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.NotUsed
+import akka.actor.ActorSystem
+import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Materializer}
+import akka.stream.scaladsl._
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+
+/**
+ * Stream example to find sum of elements
+ */
+object Test8 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
+ )
+
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ val config = parse(args)
+ implicit val system = ActorSystem("Test8", akkaConf)
+ implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
+ case true =>
+ GearpumpMaterializer()
+ case false =>
+ ActorMaterializer(
+ ActorMaterializerSettings(system).withAutoFusing(false)
+ )
+ }
+ implicit val ec = system.dispatcher
+
+ // Source gives 1 to 100 elements
+ val source: Source[Int, NotUsed] = Source(Stream.from(1).take(100))
+ val sink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _)
+
+ val result: Future[Int] = source.runWith(sink)
+ result.map(sum => {
+ println(s"Sum of stream elements => $sum")
+ })
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+ // scalastyle:on println
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala
new file mode 100644
index 0000000..66414e0
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.NotUsed
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.stream.{ActorMaterializer, ActorMaterializerSettings, ClosedShape}
+import akka.stream.scaladsl._
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+
+/**
+ * Stream example showing Broadcast
+ */
+object Test9 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
+ )
+
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ val config = parse(args)
+ implicit val system = ActorSystem("Test9", akkaConf)
+ implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
+ case true =>
+ GearpumpMaterializer()
+ case false =>
+ ActorMaterializer(
+ ActorMaterializerSettings(system).withAutoFusing(false)
+ )
+ }
+ implicit val ec = system.dispatcher
+
+ val sinkActor = system.actorOf(Props(new SinkActor()))
+ val source = Source((1 to 5))
+ val sink = Sink.actorRef(sinkActor, "COMPLETE")
+ val flowA: Flow[Int, Int, NotUsed] = Flow[Int].map {
+ x => println(s"processing broadcasted element : $x in flowA"); x
+ }
+ val flowB: Flow[Int, Int, NotUsed] = Flow[Int].map {
+ x => println(s"processing broadcasted element : $x in flowB"); x
+ }
+
+ val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
+ import GraphDSL.Implicits._
+ val broadcast = b.add(Broadcast[Int](2))
+ val merge = b.add(Merge[Int](2))
+ source ~> broadcast
+ broadcast ~> flowA ~> merge
+ broadcast ~> flowB ~> merge
+ merge ~> sink
+ ClosedShape
+ })
+
+ graph.run()
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+
+ class SinkActor extends Actor {
+ def receive: Receive = {
+ case any: AnyRef =>
+ println("Confirm received: " + any)
+ }
+ }
+ // scalastyle:on println
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala
new file mode 100644
index 0000000..2a1e7ff
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import java.io.{File, FileInputStream}
+import java.util.zip.GZIPInputStream
+
+import akka.NotUsed
+import akka.actor.ActorSystem
+import akka.stream.scaladsl._
+import akka.stream.{ClosedShape, IOResult}
+import akka.util.ByteString
+import org.apache.gearpump.akkastream.graph.GraphPartitioner
+import org.apache.gearpump.akkastream.{GearAttributes, GearpumpMaterializer}
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.util.AkkaApp
+import org.json4s.JsonAST.JString
+import org.json4s.jackson.JsonMethods
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.util.{Failure, Success, Try}
+
+/**
+ * this example is ported from http://engineering.intenthq.com/2015/06/wikidata-akka-streams/
+ * which showcases running Akka Streams DSL across JVMs on Gearpump
+ *
+ * Usage: output/target/pack/bin/gear app
+ * -jar experiments/akkastream/target/scala_2.11/akkastream-${VERSION}-SNAPSHOT-assembly.jar
+ * -input wikidata-${DATE}-all.json.gz -languages en,de
+ *
+ * (Note: Wikipedia data can be downloaded from https://dumps.wikimedia.org/wikidatawiki/entities/)
+ *
+ */
+object WikipediaApp extends ArgumentsParser with AkkaApp {
+
+ case class WikidataElement(id: String, sites: Map[String, String])
+
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ "input" -> CLIOption[String]("<Wikidata JSON dump>", required = true),
+ "languages" -> CLIOption[String]("<languages to take into account>", required = true)
+ )
+
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ val parsed = parse(args)
+ val input = new File(parsed.getString("input"))
+ val langs = parsed.getString("languages").split(",")
+
+ implicit val system = ActorSystem("WikipediaApp", akkaConf)
+ implicit val materializer =
+ GearpumpMaterializer(GraphPartitioner.TagAttributeStrategy)
+ import system.dispatcher
+
+ val elements = source(input).via(parseJson(langs))
+
+ val g = RunnableGraph.fromGraph(
+ GraphDSL.create(count) { implicit b =>
+ sinkCount => {
+ import GraphDSL.Implicits._
+ val broadcast = b.add(Broadcast[WikidataElement](2))
+ elements ~> broadcast ~> logEveryNSink(1000)
+ broadcast ~> checkSameTitles(langs.toSet) ~> sinkCount
+ ClosedShape
+ }
+ }
+ )
+
+ g.run().onComplete {
+ case Success((t, f)) => printResults(t, f)
+ // scalastyle:off println
+ case Failure(tr) => println("Something went wrong")
+ // scalastyle:on println
+ }
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+
+ def source(file: File): Source[String, Future[IOResult]] = {
+ val compressed = new GZIPInputStream(new FileInputStream(file), 65536)
+ StreamConverters.fromInputStream(() => compressed)
+ .via(Framing.delimiter(ByteString("\n"), Int.MaxValue))
+ .map(x => x.decodeString("utf-8"))
+ }
+
+ def parseJson(langs: Seq[String])(implicit ec: ExecutionContext):
+ Flow[String, WikidataElement, NotUsed] =
+ Flow[String].mapAsyncUnordered(8)(line => Future(parseItem(langs, line))).collect({
+ case Some(v) => v
+ })
+
+ def parseItem(langs: Seq[String], line: String): Option[WikidataElement] = {
+ Try(JsonMethods.parse(line)).toOption.flatMap { json =>
+ json \ "id" match {
+ case JString(itemId) =>
+
+ val sites: Seq[(String, String)] = for {
+ lang <- langs
+ JString(title) <- json \ "sitelinks" \ s"${lang}wiki" \ "title"
+ } yield lang -> title
+
+ if(sites.isEmpty) None
+ else Some(WikidataElement(id = itemId, sites = sites.toMap))
+
+ case _ => None
+ }
+ }
+ }
+
+ def logEveryNSink[T](n: Int): Sink[T, Future[Int]] = Sink.fold(0) { (x, y: T) =>
+ if (x % n == 0) {
+ // scalastyle:off println
+ println(s"Processing element $x: $y")
+ // scalastyle:on println
+ }
+ x + 1
+ }
+
+ def checkSameTitles(langs: Set[String]):
+ Flow[WikidataElement, Boolean, NotUsed] = Flow[WikidataElement]
+ .filter(_.sites.keySet == langs)
+ .map { x =>
+ val titles = x.sites.values
+ titles.forall( _ == titles.head)
+ }.withAttributes(GearAttributes.remote)
+
+ def count: Sink[Boolean, Future[(Int, Int)]] = Sink.fold((0, 0)) {
+ case ((t, f), true) => (t + 1, f)
+ case ((t, f), false) => (t, f + 1)
+ }
+
+ def printResults(t: Int, f: Int): Unit = {
+ val message = s"""
+ | Number of items with the same title: $t
+ | Number of items with the different title: $f
+ | Ratios: ${t.toDouble / (t + f)} / ${f.toDouble / (t + f)}
+ """.stripMargin
+ // scalastyle:off println
+ println(message)
+ // scalastyle:on println
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala
new file mode 100644
index 0000000..f7919c0
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.graph
+
+import akka.stream.{Shape, SinkShape, SourceShape}
+import org.apache.gearpump.akkastream.GearAttributes
+import org.apache.gearpump.akkastream.GearAttributes.{Local, Location, Remote}
+import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
+import org.apache.gearpump.akkastream.graph.GraphPartitioner.Strategy
+import org.apache.gearpump.akkastream.module._
+import akka.stream.impl.StreamLayout.Module
+import akka.stream.impl.fusing.GraphStageModule
+import akka.stream.impl.fusing.GraphStages.{MaterializedValueSource, SimpleLinearGraphStage, SingleSource}
+import akka.stream.impl.{SinkModule, SourceModule}
+import org.apache.gearpump.util.Graph
+
+/**
+ *
+ * GraphPartitioner is used to decide which part will be rendered locally
+ * and which part should be rendered remotely.
+ *
+ * We will cut the graph based on the [[Strategy]] provided.
+ *
+ * For example, for the following graph, we can cut the graph to
+ * two parts, each part will be a Sub Graph. The top SubGraph
+ * can be materialized remotely. The bottom part can be materialized
+ * locally.
+ *
+ * AtomicModule2 -> AtomicModule4
+ * /| \
+ * / \
+ * -----------cut line -------------cut line ----------
+ * / \
+ * / \|
+ * AtomicModule1 AtomicModule5
+ * \ /|
+ * \ /
+ * \| /
+ * AtomicModule3
+ *
+ * @see [[akka.stream.impl.MaterializerSession]] for more information of how Graph is organized.
+ *
+ */
+class GraphPartitioner(strategy: Strategy) {
+ def partition(moduleGraph: Graph[Module, Edge]): List[SubGraph] = {
+ val graph = removeDummyModule(moduleGraph)
+ val tags = tag(graph, strategy)
+ doPartition(graph, tags)
+ }
+
+ private def doPartition(graph: Graph[Module, Edge], tags: Map[Module, Location]):
+ List[SubGraph] = {
+ val local = Graph.empty[Module, Edge]
+ val remote = Graph.empty[Module, Edge]
+
+ graph.vertices.foreach{ module =>
+ if (tags(module) == Local) {
+ local.addVertex(module)
+ } else {
+ remote.addVertex(module)
+ }
+ }
+
+ graph.edges.foreach{ nodeEdgeNode =>
+ val (node1, edge, node2) = nodeEdgeNode
+ (tags(node1), tags(node2)) match {
+ case (Local, Local) =>
+ local.addEdge(nodeEdgeNode)
+ case (Remote, Remote) =>
+ remote.addEdge(nodeEdgeNode)
+ case (Local, Remote) =>
+ node2 match {
+ case bridge: BridgeModule[_, _, _] =>
+ local.addEdge(node1, edge, node2)
+ case _ =>
+ // create a bridge module in between
+ val bridge = new SourceBridgeModule[AnyRef, AnyRef]()
+ val remoteEdge = Edge(bridge.outPort, edge.to)
+ remote.addEdge(bridge, remoteEdge, node2)
+ val localEdge = Edge(edge.from, bridge.inPort)
+ local.addEdge(node1, localEdge, bridge)
+ }
+ case (Remote, Local) =>
+ node1 match {
+ case bridge: BridgeModule[_, _, _] =>
+ local.addEdge(node1, edge, node2)
+ case _ =>
+ // create a bridge module in between
+ val bridge = new SinkBridgeModule[AnyRef, AnyRef]()
+ val remoteEdge = Edge(edge.from, bridge.inPort)
+ remote.addEdge(node1, remoteEdge, bridge)
+ val localEdge = Edge(bridge.outPort, edge.to)
+ local.addEdge(bridge, localEdge, node2)
+ }
+ }
+ }
+
+ List(new RemoteGraph(remote), new LocalGraph(local))
+ }
+
+ private def tag(graph: Graph[Module, Edge], strategy: Strategy): Map[Module, Location] = {
+ graph.vertices.map{vertex =>
+ vertex -> strategy.apply(vertex)
+ }.toMap
+ }
+
+ private def removeDummyModule(inputGraph: Graph[Module, Edge]): Graph[Module, Edge] = {
+ val graph = inputGraph.copy
+ val dummies = graph.vertices.filter {module =>
+ module match {
+ case dummy: DummyModule =>
+ true
+ case _ =>
+ false
+ }
+ }
+ dummies.foreach(module => graph.removeVertex(module))
+ graph
+ }
+}
+
+object GraphPartitioner {
+
+ type Strategy = PartialFunction[Module, Location]
+
+ val BaseStrategy: Strategy = {
+ case source: BridgeModule[_, _, _] =>
+ Remote
+ case task: GearpumpTaskModule =>
+ Remote
+ case groupBy: GroupByModule[_, _] =>
+ // TODO: groupBy is not supported by local materializer
+ Remote
+ case source: SourceModule[_, _] =>
+ Local
+ case sink: SinkModule[_, _] =>
+ Local
+ case remaining: Module =>
+ remaining.shape match {
+ case sourceShape: SourceShape[_] =>
+ Local
+ case sinkShape: SinkShape[_] =>
+ Local
+ case otherShapes: Shape =>
+ Remote
+ }
+ }
+
+ val AllRemoteStrategy: Strategy = BaseStrategy orElse {
+ case graphStageModule: GraphStageModule =>
+ graphStageModule.stage match {
+ case matValueSource: MaterializedValueSource[_] =>
+ Local
+ case singleSource: SingleSource[_] =>
+ Local
+ case _ =>
+ Remote
+ }
+ case _ =>
+ Remote
+ }
+
+ /**
+ * Will decide whether to render a module locally or remotely
+ * based on Attribute settings.
+ *
+ */
+ val TagAttributeStrategy: Strategy = BaseStrategy orElse {
+ case module =>
+ GearAttributes.location(module.attributes)
+ }
+
+ val AllLocalStrategy: Strategy = BaseStrategy orElse {
+ case graphStageModule: GraphStageModule =>
+ // TODO kasravi review
+ graphStageModule.stage match {
+ case matValueSource: MaterializedValueSource[_] =>
+ Local
+ case _ =>
+ Local
+ }
+ case _ =>
+ Local
+ }
+
+ def apply(strategy: Strategy): GraphPartitioner = {
+ new GraphPartitioner(strategy)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala
new file mode 100644
index 0000000..fe86951
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.graph
+
+import akka.actor.ActorSystem
+import akka.stream.impl.Stages.DefaultAttributes
+import akka.stream.impl.StreamLayout.Module
+import akka.stream.impl.{PublisherSource, SubscriberSink}
+import akka.stream.{SinkShape, SourceShape}
+import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
+import org.apache.gearpump.akkastream.materializer.LocalMaterializerImpl
+import org.apache.gearpump.akkastream.module.{SinkBridgeModule, SourceBridgeModule}
+import org.apache.gearpump.util.Graph
+import org.reactivestreams.{Publisher, Subscriber}
+
+/**
+ *
+ * [[LocalGraph]] is a [[SubGraph]] of the application DSL Graph, which only
+ * contain module that can be materialized in local JVM.
+ *
+ * @param graph Graph[Module, Edge]
+ */
+class LocalGraph(override val graph: Graph[Module, Edge]) extends SubGraph
+
+object LocalGraph {
+
+ /**
+ * materialize LocalGraph in local JVM
+ * @param system ActorSystem
+ */
+ class LocalGraphMaterializer(system: ActorSystem) extends SubGraphMaterializer {
+
+ // create a local materializer
+ val materializer = LocalMaterializerImpl()(system)
+
+ /**
+ *
+ * @param matValues Materialized Values for each module before materialization
+ * @return Materialized Values for each Module after the materialization.
+ */
+ override def materialize(graph: SubGraph,
+ matValues: scala.collection.mutable.Map[Module, Any]):
+ scala.collection.mutable.Map[Module, Any] = {
+ val newGraph: Graph[Module, Edge] = graph.graph.mapVertex {
+ case source: SourceBridgeModule[in, out] =>
+ val subscriber = matValues(source).asInstanceOf[Subscriber[in]]
+ val shape: SinkShape[in] = SinkShape(source.inPort)
+ new SubscriberSink(subscriber, DefaultAttributes.subscriberSink, shape)
+ case sink: SinkBridgeModule[in, out] =>
+ val publisher = matValues(sink).asInstanceOf[Publisher[out]]
+ val shape: SourceShape[out] = SourceShape(sink.outPort)
+ new PublisherSource(publisher, DefaultAttributes.publisherSource, shape)
+ case other =>
+ other
+ }
+ materializer.materialize(newGraph, matValues)
+ }
+
+ override def shutdown: Unit = {
+ materializer.shutdown()
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
new file mode 100644
index 0000000..99ebe17
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.graph
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
+import org.apache.gearpump.akkastream.materializer.RemoteMaterializerImpl
+import org.apache.gearpump.akkastream.module.{SinkBridgeModule, SourceBridgeModule}
+import org.apache.gearpump.akkastream.task.SinkBridgeTask.SinkBridgeTaskClient
+import org.apache.gearpump.akkastream.task.SourceBridgeTask.SourceBridgeTaskClient
+import akka.stream.impl.StreamLayout.Module
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.embedded.EmbeddedCluster
+import org.apache.gearpump.streaming.ProcessorId
+import org.apache.gearpump.util.Graph
+
+/**
+ *
+ * [[RemoteGraph]] is a [[SubGraph]] of the application DSL Graph, which only
+ * contain modules that can be materialized in remote Gearpump cluster.
+ *
+ * @param graph Graph
+ */
+class RemoteGraph(override val graph: Graph[Module, Edge]) extends SubGraph
+
+object RemoteGraph {
+
+ /**
+ * * materialize LocalGraph in remote gearpump cluster
+ * @param useInProcessCluster Boolean
+ * @param system ActorSystem
+ */
+ class RemoteGraphMaterializer(useInProcessCluster: Boolean, system: ActorSystem)
+ extends SubGraphMaterializer {
+ private val local = if (useInProcessCluster) {
+ val cluster = EmbeddedCluster()
+ cluster.start()
+ Some(cluster)
+ } else {
+ None
+ }
+
+ private val context: ClientContext = local match {
+ case Some(l) => l.newClientContext
+ case None => ClientContext(system)
+ }
+
+ override def materialize(subGraph: SubGraph,
+ inputMatValues: scala.collection.mutable.Map[Module, Any]):
+ scala.collection.mutable.Map[Module, Any] = {
+ val graph = subGraph.graph
+
+ if (graph.isEmpty) {
+ inputMatValues
+ } else {
+ doMaterialize(graph: Graph[Module, Edge], inputMatValues)
+ }
+ }
+
+ private def doMaterialize(graph: Graph[Module, Edge],
+ inputMatValues: scala.collection.mutable.Map[Module, Any]):
+ scala.collection.mutable.Map[Module, Any] = {
+ val materializer = new RemoteMaterializerImpl(graph, system)
+ val (app, matValues) = materializer.materialize
+
+ val appId = context.submit(app).appId
+ // scalastyle:off println
+ println("sleep 5 second until the application is ready on cluster")
+ // scalastyle:on println
+ Thread.sleep(5000)
+
+ def resolve(matValues: Map[Module, ProcessorId]): Map[Module, Any] = {
+ matValues.toList.flatMap { kv =>
+ val (module, processorId) = kv
+ module match {
+ case source: SourceBridgeModule[_, _] =>
+ val bridge = new SourceBridgeTaskClient[AnyRef](system.dispatcher,
+ context, appId, processorId)
+ Some((module, bridge))
+ case sink: SinkBridgeModule[_, _] =>
+ val bridge = new SinkBridgeTaskClient(system, context, appId, processorId)
+ Some((module, bridge))
+ case other =>
+ None
+ }
+ }.toMap
+ }
+
+ inputMatValues ++ resolve(matValues)
+ }
+
+ override def shutdown: Unit = {
+ context.close()
+ local.foreach(_.stop())
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala
new file mode 100644
index 0000000..a74143e
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.graph
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
+import akka.stream.impl.StreamLayout.Module
+import org.apache.gearpump.util.Graph
+
+/**
+ * [[SubGraph]] is a partial DAG
+ *
+ * The idea is that by dividing [[Graph]] to several
+ * [[SubGraph]], we can materialize each [[SubGraph]] with different
+ * materializer.
+ */
+
+trait SubGraph {
+
+ /**
+ * the [[Graph]] representation of this SubGraph
+ * @return
+ */
+ def graph: Graph[Module, Edge]
+}
+
+
+/**
+ * Materializer for Sub-Graph type
+ */
+trait SubGraphMaterializer {
+ /**
+ *
+ * @param matValues Materialized Values for each module before materialization
+ * @return Materialized Values for each Module after the materialization.
+ */
+
+ def materialize(graph: SubGraph,
+ matValues: scala.collection.mutable.Map[Module, Any]):
+ scala.collection.mutable.Map[Module, Any]
+
+ def shutdown: Unit
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala
new file mode 100644
index 0000000..477f4d3
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.materializer
+
+import java.util.concurrent.atomic.AtomicBoolean
+import java.{util => ju}
+
+import org.apache.gearpump.util.{Graph => GGraph}
+import akka.actor.{ActorRef, ActorSystem, Cancellable, Deploy, PoisonPill}
+import akka.dispatch.Dispatchers
+import akka.event.{Logging, LoggingAdapter}
+import akka.stream.impl.StreamLayout._
+import akka.stream.impl._
+import akka.stream.impl.fusing.GraphInterpreter.GraphAssembly
+import akka.stream.impl.fusing.{ActorGraphInterpreter, Fold, GraphInterpreterShell, GraphModule, GraphStageModule}
+import akka.stream.impl.fusing.GraphStages.MaterializedValueSource
+import akka.stream.scaladsl.ModuleExtractor
+import akka.stream.{ClosedShape, Graph => AkkaGraph, _}
+import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
+import org.apache.gearpump.akkastream.module.ReduceModule
+import org.apache.gearpump.akkastream.util.MaterializedValueOps
+import org.reactivestreams.{Publisher, Subscriber}
+
+import scala.concurrent.ExecutionContextExecutor
+import scala.concurrent.duration.FiniteDuration
+
+/**
+ * This materializer is functional equivalent to [[akka.stream.impl.ActorMaterializerImpl]]
+ *
+ * @param system System
+ * @param settings ActorMaterializerSettings
+ * @param dispatchers Dispatchers
+ * @param supervisor ActorRef
+ * @param haveShutDown AtomicBoolean
+ * @param flowNames SeqActorName
+ */
+case class LocalMaterializerImpl (
+ override val system: ActorSystem,
+ override val settings: ActorMaterializerSettings,
+ dispatchers: Dispatchers,
+ override val supervisor: ActorRef,
+ haveShutDown: AtomicBoolean,
+ flowNames: SeqActorName)
+ extends ExtendedActorMaterializer {
+
+ override def logger: LoggingAdapter = Logging.getLogger(system, this)
+
+ override def schedulePeriodically(initialDelay: FiniteDuration, interval: FiniteDuration,
+ task: Runnable): Cancellable =
+ system.scheduler.schedule(initialDelay, interval, task)(executionContext)
+
+ override def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable =
+ system.scheduler.scheduleOnce(delay, task)(executionContext)
+
+ override def effectiveSettings(opAttr: Attributes): ActorMaterializerSettings = {
+ import ActorAttributes._
+ import Attributes._
+ opAttr.attributeList.foldLeft(settings) { (s, attr) =>
+ attr match {
+ case InputBuffer(initial, max) => s.withInputBuffer(initial, max)
+ case Dispatcher(dispatcher) => s.withDispatcher(dispatcher)
+ case SupervisionStrategy(decider) => s.withSupervisionStrategy(decider)
+ case l: LogLevels => s
+ case Name(_) => s
+ case other => s
+ }
+ }
+ }
+
+ override def shutdown(): Unit =
+ if (haveShutDown.compareAndSet(false, true)) supervisor ! PoisonPill
+
+ override def isShutdown: Boolean = haveShutDown.get()
+
+ override lazy val executionContext: ExecutionContextExecutor =
+ dispatchers.lookup(settings.dispatcher match {
+ case Deploy.NoDispatcherGiven => Dispatchers.DefaultDispatcherId
+ case other => other
+ })
+
+
+ case class LocalMaterializerSession(module: Module, iAttributes: Attributes,
+ subflowFuser: GraphInterpreterShell => ActorRef = null)
+ extends MaterializerSession(module, iAttributes) {
+
+ override def materializeAtomic(atomic: AtomicModule,
+ effectiveAttributes: Attributes, matVal: ju.Map[Module, Any]): Unit = {
+
+ def newMaterializationContext() =
+ new MaterializationContext(LocalMaterializerImpl.this, effectiveAttributes,
+ stageName(effectiveAttributes))
+ atomic match {
+ case sink: SinkModule[_, _] =>
+ val (sub, mat) = sink.create(newMaterializationContext())
+ assignPort(sink.shape.in, sub.asInstanceOf[Subscriber[Any]])
+ matVal.put(atomic, mat)
+ case source: SourceModule[_, _] =>
+ val (pub, mat) = source.create(newMaterializationContext())
+ assignPort(source.shape.out, pub.asInstanceOf[Publisher[Any]])
+ matVal.put(atomic, mat)
+ case stage: ProcessorModule[_, _, _] =>
+ val (processor, mat) = stage.createProcessor()
+ assignPort(stage.inPort, processor)
+ assignPort(stage.outPort, processor.asInstanceOf[Publisher[Any]])
+ matVal.put(atomic, mat)
+ // FIXME
+ // case tls: TlsModule =>
+ // TODO solve this so TlsModule doesn't need special treatment here
+ // val es = effectiveSettings(effectiveAttributes)
+ // val props =
+ // TLSActor.props(es, tls.sslContext, tls.sslConfig,
+ // tls.firstSession, tls.role, tls.closing, tls.hostInfo)
+ // val impl = actorOf(props, stageName(effectiveAttributes), es.dispatcher)
+ // def factory(id: Int) = new ActorPublisher[Any](impl) {
+ // override val wakeUpMsg = FanOut.SubstreamSubscribePending(id)
+ // }
+ // val publishers = Vector.tabulate(2)(factory)
+ // impl ! FanOut.ExposedPublishers(publishers)
+ //
+ // assignPort(tls.plainOut, publishers(TLSActor.UserOut))
+ // assignPort(tls.cipherOut, publishers(TLSActor.TransportOut))
+ //
+ // assignPort(tls.plainIn, FanIn.SubInput[Any](impl, TLSActor.UserIn))
+ // assignPort(tls.cipherIn, FanIn.SubInput[Any](impl, TLSActor.TransportIn))
+ //
+ // matVal.put(atomic, NotUsed)
+ case graph: GraphModule =>
+ matGraph(graph, effectiveAttributes, matVal)
+ case stage: GraphStageModule =>
+ val graph =
+ GraphModule(GraphAssembly(stage.shape.inlets, stage.shape.outlets, stage.stage),
+ stage.shape, stage.attributes, Array(stage))
+ matGraph(graph, effectiveAttributes, matVal)
+ }
+ }
+
+ private def matGraph(graph: GraphModule, effectiveAttributes: Attributes,
+ matVal: ju.Map[Module, Any]): Unit = {
+ val calculatedSettings = effectiveSettings(effectiveAttributes)
+ val (handlers, logics) =
+ graph.assembly.materialize(effectiveAttributes, graph.matValIDs, matVal, registerSrc)
+
+ val shell = new GraphInterpreterShell(graph.assembly, handlers,
+ logics, graph.shape, calculatedSettings, LocalMaterializerImpl.this)
+
+ val impl =
+ if (subflowFuser != null && !effectiveAttributes.contains(Attributes.AsyncBoundary)) {
+ subflowFuser(shell)
+ } else {
+ val props = ActorGraphInterpreter.props(shell)
+ actorOf(props, stageName(effectiveAttributes), calculatedSettings.dispatcher)
+ }
+
+ for ((inlet, i) <- graph.shape.inlets.iterator.zipWithIndex) {
+ val subscriber = new ActorGraphInterpreter.BoundarySubscriber(impl, shell, i)
+ assignPort(inlet, subscriber)
+ }
+ for ((outlet, i) <- graph.shape.outlets.iterator.zipWithIndex) {
+ val publisher = new ActorGraphInterpreter.BoundaryPublisher(impl, shell, i)
+ impl ! ActorGraphInterpreter.ExposedPublisher(shell, i, publisher)
+ assignPort(outlet, publisher)
+ }
+ }
+ }
+
+ override def materialize[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat]): Mat = {
+
+ LocalMaterializerSession(ModuleExtractor.unapply(runnableGraph).get,
+ null, null).materialize().asInstanceOf[Mat]
+
+ }
+
+ override def materialize[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat],
+ initialAttributes: Attributes): Mat = {
+ materialize(runnableGraph)
+ }
+
+ override def materialize[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat],
+ subflowFuser: GraphInterpreterShell => ActorRef): Mat = {
+
+ LocalMaterializerSession(ModuleExtractor.unapply(runnableGraph).get,
+ null, null).materialize().asInstanceOf[Mat]
+
+ }
+
+ override def materialize[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat],
+ subflowFuser: (GraphInterpreterShell) => ActorRef, initialAttributes: Attributes): Mat = {
+ materialize(runnableGraph)
+ }
+
+ override def makeLogger(logSource: Class[_]): LoggingAdapter = {
+ logger
+ }
+
+ def buildToplevelModule(graph: GGraph[Module, Edge]): Module = {
+ var moduleInProgress: Module = EmptyModule
+ graph.vertices.foreach(module => {
+ moduleInProgress = moduleInProgress.compose(module)
+ })
+ graph.edges.foreach(value => {
+ val (node1, edge, node2) = value
+ moduleInProgress = moduleInProgress.wire(edge.from, edge.to)
+ })
+
+ moduleInProgress
+ }
+
+ def materialize(graph: GGraph[Module, Edge],
+ inputMatValues: scala.collection.mutable.Map[Module, Any]):
+ scala.collection.mutable.Map[Module, Any] = {
+ val topLevelModule = buildToplevelModule(graph)
+ val session = LocalMaterializerSession(topLevelModule, null, null)
+ import scala.collection.JavaConverters._
+ val matV = inputMatValues.asJava
+ val materializedGraph = graph.mapVertex { module =>
+ session.materializeAtomic(module.asInstanceOf[AtomicModule], module.attributes, matV)
+ matV.get(module)
+ }
+ materializedGraph.edges.foreach { nodeEdgeNode =>
+ val (node1, edge, node2) = nodeEdgeNode
+ val from = edge.from
+ val to = edge.to
+ node1 match {
+ case module1: Module =>
+ node2 match {
+ case module2: Module =>
+ val publisher = module1.downstreams(from).asInstanceOf[Publisher[Any]]
+ val subscriber = module2.upstreams(to).asInstanceOf[Subscriber[Any]]
+ publisher.subscribe(subscriber)
+ case _ =>
+ }
+ case _ =>
+ }
+ }
+ val matValSources = graph.vertices.flatMap(module => {
+ val rt: Option[MaterializedValueSource[_]] = module match {
+ case graphStage: GraphStageModule =>
+ graphStage.stage match {
+ case materializedValueSource: MaterializedValueSource[_] =>
+ Some(materializedValueSource)
+ case _ =>
+ None
+ }
+ case _ =>
+ None
+ }
+ rt
+ })
+ publishToMaterializedValueSource(matValSources, inputMatValues)
+ inputMatValues
+ }
+
+ private def publishToMaterializedValueSource(modules: List[MaterializedValueSource[_]],
+ matValues: scala.collection.mutable.Map[Module, Any]): Unit = {
+ modules.foreach { source =>
+ Option(source.computation).map { attr =>
+ val valueToPublish = MaterializedValueOps(attr).resolve(matValues)
+ source.setValue(valueToPublish)
+ }
+ }
+ }
+
+ private[this] def createFlowName(): String = flowNames.next()
+
+ val flowName = createFlowName()
+ var nextId = 0
+
+ def stageName(attr: Attributes): String = {
+ val name = s"$flowName-$nextId-${attr.nameOrDefault()}"
+ nextId += 1
+ name
+ }
+
+ override def withNamePrefix(name: String): LocalMaterializerImpl =
+ this.copy(flowNames = flowNames.copy(name))
+
+}
+
+object LocalMaterializerImpl {
+ case class MaterializedModule(module: Module, matValue: Any,
+ inputs: Map[InPort, Subscriber[_]] = Map.empty[InPort, Subscriber[_]],
+ outputs: Map[OutPort, Publisher[_]] = Map.empty[OutPort, Publisher[_]])
+
+ def apply(materializerSettings: Option[ActorMaterializerSettings] = None,
+ namePrefix: Option[String] = None)(implicit system: ActorSystem):
+ LocalMaterializerImpl = {
+
+ val settings = materializerSettings getOrElse ActorMaterializerSettings(system)
+ apply(settings, namePrefix.getOrElse("flow"))(system)
+ }
+
+ def apply(materializerSettings: ActorMaterializerSettings,
+ namePrefix: String)(implicit system: ActorSystem): LocalMaterializerImpl = {
+ val haveShutDown = new AtomicBoolean(false)
+
+ new LocalMaterializerImpl(
+ system,
+ materializerSettings,
+ system.dispatchers,
+ system.actorOf(StreamSupervisor.props(materializerSettings,
+ haveShutDown).withDispatcher(materializerSettings.dispatcher)),
+ haveShutDown,
+ FlowNames(system).name.copy(namePrefix))
+ }
+
+ def toFoldModule(reduce: ReduceModule[Any]): Fold[Any, Any] = {
+ val f = reduce.f
+ val aggregator = {(zero: Any, input: Any) =>
+ if (zero == null) {
+ input
+ } else {
+ f(zero, input)
+ }
+ }
+ Fold(null, aggregator)
+ }
+}