You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/10/11 02:33:20 UTC
[3/5] incubator-gearpump git commit: Fixes #22 support akka-streams
Gearpump Materializer
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala
new file mode 100644
index 0000000..75dc95a
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream
+
+import java.util.concurrent.atomic.AtomicBoolean
+
+import akka.NotUsed
+import akka.actor.{ActorContext, ActorRef, ActorRefFactory, ActorSystem, Cancellable, ExtendedActorSystem}
+import akka.event.{Logging, LoggingAdapter}
+import akka.stream.Attributes.Attribute
+import akka.stream._
+import akka.stream.impl.Stages.SymbolicGraphStage
+import akka.stream.impl.StreamLayout._
+import akka.stream.impl._
+import akka.stream.impl.fusing.{GraphInterpreterShell, GraphStageModule}
+import akka.stream.scaladsl.ModuleExtractor
+import akka.stream.stage.GraphStage
+import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
+import org.apache.gearpump.akkastream.graph.GraphPartitioner.Strategy
+import org.apache.gearpump.akkastream.graph.LocalGraph.LocalGraphMaterializer
+import org.apache.gearpump.akkastream.graph.RemoteGraph.RemoteGraphMaterializer
+import org.apache.gearpump.akkastream.graph._
+import org.apache.gearpump.akkastream.util.MaterializedValueOps
+
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContextExecutor, Promise}
+import scala.concurrent.duration.FiniteDuration
+
+object GearpumpMaterializer {
+
+ final val Debug = true
+
+ final case class Edge(from: OutPort, to: InPort)
+
+ final case class MaterializedValueSourceAttribute(mat: MaterializedValueNode) extends Attribute
+
+ implicit def boolToAtomic(bool: Boolean): AtomicBoolean = new AtomicBoolean(bool)
+
+ def apply(strategy: Strategy)(implicit context: ActorRefFactory):
+ ExtendedActorMaterializer = {
+ val system = actorSystemOf(context)
+
+ apply(ActorMaterializerSettings(
+ system).withAutoFusing(false), strategy, useLocalCluster = false, "flow")(context)
+ }
+
+ def apply(materializerSettings: Option[ActorMaterializerSettings] = None,
+ strategy: Strategy = GraphPartitioner.AllRemoteStrategy,
+ useLocalCluster: Boolean = true,
+ namePrefix: Option[String] = None)(implicit context: ActorRefFactory):
+ ExtendedActorMaterializer = {
+ val system = actorSystemOf(context)
+
+ val settings = materializerSettings getOrElse
+ ActorMaterializerSettings(system).withAutoFusing(false)
+ apply(settings, strategy, useLocalCluster, namePrefix.getOrElse("flow"))(context)
+ }
+
+ def apply(materializerSettings: ActorMaterializerSettings,
+ strategy: Strategy,
+ useLocalCluster: Boolean,
+ namePrefix: String)(implicit context: ActorRefFactory):
+ ExtendedActorMaterializer = {
+ val system = actorSystemOf(context)
+
+ new GearpumpMaterializer(
+ system,
+ materializerSettings,
+ context.actorOf(
+ StreamSupervisor.props(materializerSettings, false).withDispatcher(
+ materializerSettings.dispatcher), StreamSupervisor.nextName()))
+ }
+
+
+ private def actorSystemOf(context: ActorRefFactory): ActorSystem = {
+ val system = context match {
+ case s: ExtendedActorSystem => s
+ case c: ActorContext => c.system
+ case null => throw new IllegalArgumentException("ActorRefFactory context must be defined")
+ case _ =>
+ throw new IllegalArgumentException(
+ s"""
+ | context must be a ActorSystem or ActorContext, got [${context.getClass.getName}]
+ """.stripMargin
+ )
+ }
+ system
+ }
+
+}
+
+/**
+ *
+ * [[GearpumpMaterializer]] allows you to render akka-stream DSL as a Gearpump
+ * streaming application. If some module cannot be rendered remotely in Gearpump
+ * Cluster, then it will use local Actor materializer as fallback to materialize
+ * the module locally.
+ *
+ * User can customize a [[org.apache.gearpump.akkastream.graph.GraphPartitioner.Strategy]]
+ * to determine which module should be rendered
+ * remotely, and which module should be rendered locally.
+ *
+ * @see [[org.apache.gearpump.akkastream.graph.GraphPartitioner]]
+ * to find out how we cut the runnableGraph to two parts,
+ * and materialize them separately.
+ * @param system ActorSystem
+ * @param strategy Strategy
+ * @param useLocalCluster whether to use built-in in-process local cluster
+ */
+class GearpumpMaterializer(override val system: ActorSystem,
+ override val settings: ActorMaterializerSettings,
+ override val supervisor: ActorRef,
+ strategy: Strategy = GraphPartitioner.AllRemoteStrategy,
+ useLocalCluster: Boolean = true, namePrefix: Option[String] = None)
+ extends ExtendedActorMaterializer {
+
+ private val subMaterializers: Map[Class[_], SubGraphMaterializer] = Map(
+ classOf[LocalGraph] -> new LocalGraphMaterializer(system),
+ classOf[RemoteGraph] -> new RemoteGraphMaterializer(useLocalCluster, system)
+ )
+
+ override def logger: LoggingAdapter = Logging.getLogger(system, this)
+
+ override def isShutdown: Boolean = system.isTerminated
+
+ override def effectiveSettings(opAttr: Attributes): ActorMaterializerSettings = {
+ import ActorAttributes._
+ import Attributes._
+ opAttr.attributeList.foldLeft(settings) { (s, attr) =>
+ attr match {
+ case InputBuffer(initial, max) => s.withInputBuffer(initial, max)
+ case Dispatcher(dispatcher) => s.withDispatcher(dispatcher)
+ case SupervisionStrategy(decider) => s.withSupervisionStrategy(decider)
+ case _ => s
+ }
+ }
+ }
+
+ override def withNamePrefix(name: String): ExtendedActorMaterializer =
+ throw new UnsupportedOperationException()
+
+ override implicit def executionContext: ExecutionContextExecutor =
+ throw new UnsupportedOperationException()
+
+ override def schedulePeriodically(initialDelay: FiniteDuration,
+ interval: FiniteDuration,
+ task: Runnable): Cancellable =
+ system.scheduler.schedule(initialDelay, interval, task)(executionContext)
+
+ override def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable =
+ system.scheduler.scheduleOnce(delay, task)(executionContext)
+
+ override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat]): Mat = {
+ val initialAttributes = Attributes(
+ Attributes.InputBuffer(
+ settings.initialInputBufferSize,
+ settings.maxInputBufferSize
+ ) ::
+ ActorAttributes.Dispatcher(settings.dispatcher) ::
+ ActorAttributes.SupervisionStrategy(settings.supervisionDecider) ::
+ Nil)
+
+ val info = Fusing.aggressive(runnableGraph).module.info
+ import _root_.org.apache.gearpump.util.{Graph => GGraph}
+ val graph = GGraph.empty[Module, Edge]
+
+ info.allModules.foreach(module => {
+ if (module.isCopied) {
+ val original = module.asInstanceOf[CopiedModule].copyOf
+ graph.addVertex(original)
+ module.shape.outlets.zip(original.shape.outlets).foreach(out => {
+ val (cout, oout) = out
+ val cin = info.downstreams(cout)
+ val downStreamModule = info.inOwners(cin)
+ if(downStreamModule.isCopied) {
+ val downStreamOriginal = downStreamModule.asInstanceOf[CopiedModule].copyOf
+ downStreamModule.shape.inlets.zip(downStreamOriginal.shape.inlets).foreach(in => {
+ in._1 == cin match {
+ case true =>
+ val oin = in._2
+ graph.addEdge(original, Edge(oout, oin), downStreamOriginal)
+ case false =>
+ }
+ })
+ }
+ })
+ }
+ })
+
+ if(Debug) {
+ val iterator = graph.topologicalOrderIterator
+ while (iterator.hasNext) {
+ val module = iterator.next()
+ // scalastyle:off println
+ module match {
+ case graphStageModule: GraphStageModule =>
+ graphStageModule.stage match {
+ case symbolicGraphStage: SymbolicGraphStage[_, _, _] =>
+ val symbolicName = symbolicGraphStage.symbolicStage.getClass.getSimpleName
+ println(
+ s"${module.getClass.getSimpleName}(${symbolicName})"
+ )
+ case graphStage: GraphStage[_] =>
+ val name = graphStage.getClass.getSimpleName
+ println(
+ s"${module.getClass.getSimpleName}(${name})"
+ )
+ case other =>
+ println(
+ s"${module.getClass.getSimpleName}(${other.getClass.getSimpleName})"
+ )
+ }
+ case _ =>
+ println(module.getClass.getSimpleName)
+ }
+ // scalastyle:on println
+ }
+ }
+
+ val subGraphs = GraphPartitioner(strategy).partition(graph)
+ val matValues = subGraphs.foldLeft(mutable.Map.empty[Module, Any]) { (map, subGraph) =>
+ val materializer = subMaterializers(subGraph.getClass)
+ map ++ materializer.materialize(subGraph, map)
+ }
+ val mat = matValues.flatMap(pair => {
+ val (module, any) = pair
+ any match {
+ case notUsed: NotUsed =>
+ None
+ case others =>
+ val rt = module.shape match {
+ case sink: SinkShape[_] =>
+ Some(any)
+ case _ =>
+ None
+ }
+ rt
+ }
+ }).toList
+ val matModule = subGraphs.last.graph.topologicalOrderIterator.toList.last
+ val mat2 = resolveMaterialized(matModule.materializedValueComputation, matValues)
+ val rt = Some(mat).flatMap(any => {
+ any match {
+ case promise: Promise[_] =>
+ Some(promise.future)
+ case other =>
+ Some(other)
+ }
+ })
+ rt.getOrElse(null).asInstanceOf[Mat]
+ }
+
+ override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat],
+ subflowFuser: GraphInterpreterShell => ActorRef): Mat = {
+ materialize(runnableGraph)
+ }
+
+ def shutdown: Unit = {
+ subMaterializers.values.foreach(_.shutdown)
+ }
+
+ private def resolveMaterialized(mat: MaterializedValueNode,
+ materializedValues: mutable.Map[Module, Any]): Any = mat match {
+ case Atomic(m) =>
+ materializedValues.getOrElse(m, ())
+ case Combine(f, d1, d2) =>
+ f(resolveMaterialized(d1, materializedValues), resolveMaterialized(d2, materializedValues))
+ case Transform(f, d) =>
+ f(resolveMaterialized(d, materializedValues))
+ case Ignore =>
+ ()
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala
new file mode 100644
index 0000000..871dcf8
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream
+
+import java.{util => ju}
+
+import _root_.org.apache.gearpump.util.{Graph => GGraph}
+import akka.actor.ActorSystem
+import akka.stream._
+import org.apache.gearpump.akkastream.GearpumpMaterializer.{Edge, MaterializedValueSourceAttribute}
+import akka.stream.impl.StreamLayout._
+import akka.stream.impl._
+import akka.stream.impl.fusing.GraphStages.MaterializedValueSource
+
+class GearpumpMaterializerSession(system: ActorSystem, topLevel: Module,
+ initialAttributes: Attributes, namePrefix: Option[String] = None)
+ extends MaterializerSession(topLevel, initialAttributes) {
+
+ private[this] def createFlowName(): String =
+ FlowNames(system).name.copy(namePrefix.getOrElse("flow")).next()
+
+ private val flowName = createFlowName()
+ private var nextId = 0
+
+ private def stageName(attr: Attributes): String = {
+ val name = s"$flowName-$nextId-${attr.nameOrDefault()}"
+ nextId += 1
+ name
+ }
+
+ val graph = GGraph.empty[Module, Edge]
+
+ def addEdge(publisher: (OutPort, Module), subscriber: (InPort, Module)): Unit = {
+ graph.addEdge(publisher._2, Edge(publisher._1, subscriber._1), subscriber._2)
+ }
+
+ def addVertex(module: Module): Unit = {
+ graph.addVertex(module)
+ }
+
+ override def materializeModule(module: Module, parentAttributes: Attributes): Any = {
+
+ val materializedValues: ju.Map[Module, Any] = new ju.HashMap
+ val currentAttributes = mergeAttributes(parentAttributes, module.attributes)
+
+ val materializedValueSources = List.empty[MaterializedValueSource[_]]
+
+ for (submodule <- module.subModules) {
+ submodule match {
+ case atomic: AtomicModule =>
+ materializeAtomic(atomic, currentAttributes, materializedValues)
+ case copied: CopiedModule =>
+ enterScope(copied)
+ materializedValues.put(copied, materializeModule(copied, currentAttributes))
+ exitScope(copied)
+ case composite =>
+ materializedValues.put(composite, materializeComposite(composite, currentAttributes))
+ case EmptyModule =>
+ }
+ }
+
+ val mat = resolveMaterialized(module.materializedValueComputation, materializedValues)
+
+ materializedValueSources.foreach { module =>
+ val matAttribute =
+ new MaterializedValueSourceAttribute(mat.asInstanceOf[MaterializedValueNode])
+ val copied = copyAtomicModule(module.module, parentAttributes
+ and Attributes(matAttribute))
+ // TODO
+ // assignPort(module.shape.out, (copied.shape.outlets.head, copied))
+ addVertex(copied)
+ materializedValues.put(copied, Atomic(copied))
+ }
+ mat
+
+ }
+
+ override protected def materializeComposite(composite: Module,
+ effectiveAttributes: Attributes): Any = {
+ materializeModule(composite, effectiveAttributes)
+ }
+
+ protected def materializeAtomic(atomic: AtomicModule,
+ parentAttributes: Attributes,
+ matVal: ju.Map[Module, Any]): Unit = {
+
+ val (inputs, outputs) = (atomic.shape.inlets, atomic.shape.outlets)
+ val copied = copyAtomicModule(atomic, parentAttributes)
+
+ for ((in, id) <- inputs.zipWithIndex) {
+ val inPort = inPortMapping(atomic, copied)(in)
+ // assignPort(in, (inPort, copied))
+ }
+
+ for ((out, id) <- outputs.zipWithIndex) {
+ val outPort = outPortMapping(atomic, copied)(out)
+ // TODO
+ // assignPort(out, (outPort, copied))
+ }
+
+ addVertex(copied)
+ matVal.put(atomic, Atomic(copied))
+ }
+
+ private def copyAtomicModule[T <: Module](module: T, parentAttributes: Attributes): T = {
+ val currentAttributes = mergeAttributes(parentAttributes, module.attributes)
+ module.withAttributes(currentAttributes).asInstanceOf[T]
+ }
+
+ private def outPortMapping(from: Module, to: Module): Map[OutPort, OutPort] = {
+ from.shape.outlets.iterator.zip(to.shape.outlets.iterator).toList.toMap
+ }
+
+ private def inPortMapping(from: Module, to: Module): Map[InPort, InPort] = {
+ from.shape.inlets.iterator.zip(to.shape.inlets.iterator).toList.toMap
+ }
+
+ protected def resolveMaterialized(matNode: MaterializedValueNode,
+ materializedValues: ju.Map[Module, Any]):
+ Any =
+ matNode match {
+ case Atomic(m) => materializedValues.get(m)
+ case Combine(f, d1, d2) => f(resolveMaterialized(d1, materializedValues),
+ resolveMaterialized(d2, materializedValues))
+ case Transform(f, d) => f(resolveMaterialized(d, materializedValues))
+ case Ignore => Ignore
+ }
+}
+
+object GearpumpMaterializerSession {
+ def apply(system: ActorSystem, topLevel: Module,
+ initialAttributes: Attributes, namePrefix: Option[String] = None):
+ GearpumpMaterializerSession = {
+ new GearpumpMaterializerSession(system, topLevel, initialAttributes, namePrefix)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala
new file mode 100644
index 0000000..2ce4e19
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.stream.scaladsl.{Sink, Source}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+
+/**
+ * Source and Sink are materialized locally.
+ * Remaining GraphStages are materialized remotely:
+ * statefulMap, filter, fold, flatMap
+ */
+object Test extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ implicit val system = ActorSystem("Test", akkaConf)
+ implicit val materializer = GearpumpMaterializer()
+
+ val echo = system.actorOf(Props(new Echo()))
+ val sink = Sink.actorRef(echo, "COMPLETE")
+
+ Source(
+ List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky")
+ ).filter(_.startsWith("red")).fold("Items:") {(a, b) =>
+ a + "|" + b
+ }.map("I want to order item: " + _).runWith(sink)
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+
+ class Echo extends Actor {
+ def receive: Receive = {
+ case any: AnyRef =>
+ println("Confirm received: " + any)
+ }
+ }
+ // scalastyle:on println
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test10.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test10.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test10.scala
new file mode 100644
index 0000000..71678c3
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test10.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.NotUsed
+import akka.stream.{ClosedShape, ThrottleMode}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+/**
+ * Stream example showing Conflate, Throttle
+ */
+object Test10 extends AkkaApp with ArgumentsParser {
+
+ // scalastyle:off println
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ import akka.actor.ActorSystem
+ import akka.stream.scaladsl._
+
+ implicit val system = ActorSystem("Test10", akkaConfig)
+ implicit val materializer = GearpumpMaterializer()
+ implicit val ec = system.dispatcher
+
+ // Conflate[A] - (2 inputs, 1 output) concatenates two streams
+ // (first consumes one, then the second one)
+ def stream(x: String) = Stream.continually(x)
+
+ val sourceA = Source(stream("A"))
+ val sourceB = Source(stream("B"))
+
+ val throttler: Flow[String, String, NotUsed] =
+ Flow[String].throttle(1, 1.second, 1, ThrottleMode.Shaping)
+ val conflateFlow: Flow[String, String, NotUsed] =
+ Flow[String].conflate((x: String, y: String) => x: String)
+ ((acc: String, x: String) => s"$acc::$x")
+
+ val printFlow: Flow[(String, String), String, NotUsed] =
+ Flow[(String, String)].map {
+ x =>
+ println(s" lengths are : ${x._1.length} and ${x._2.length} ; ${x._1} zip ${x._2}")
+ x.toString
+ }
+
+ val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
+ import GraphDSL.Implicits._
+
+ val zipping = b.add(Zip[String, String]())
+
+ sourceA ~> throttler ~> zipping.in0
+ sourceB ~> conflateFlow ~> zipping.in1
+
+ zipping.out ~> printFlow ~> Sink.ignore
+
+ ClosedShape
+ })
+
+ graph.run()
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+ // scalastyle:on println
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test11.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test11.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test11.scala
new file mode 100644
index 0000000..b80398c
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test11.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.NotUsed
+import akka.stream.ClosedShape
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+/**
+ * Stream example showing Broadcast and Merge
+ */
+object Test11 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ import akka.actor.ActorSystem
+ import akka.stream.scaladsl._
+
+ implicit val system = ActorSystem("Test11", akkaConfig)
+ implicit val materializer = GearpumpMaterializer()
+// implicit val materializer =
+// ActorMaterializer(ActorMaterializerSettings(system).withAutoFusing(false))
+ implicit val ec = system.dispatcher
+
+ val g = RunnableGraph.fromGraph(GraphDSL.create() {
+ implicit builder: GraphDSL.Builder[NotUsed] =>
+
+ import GraphDSL.Implicits._
+ val in = Source(1 to 10)
+ val output: (Any) => Unit = any => {
+ val s = s"**** $any"
+ println(s)
+ }
+ val out = Sink.foreach(output)
+
+ val broadcast = builder.add(Broadcast[Int](2))
+ val merge = builder.add(Merge[Int](2))
+
+ val f1, f2, f3, f4 = Flow[Int].map(_ + 10)
+
+ in ~> f1 ~> broadcast ~> f2 ~> merge ~> f3 ~> out
+ broadcast ~> f4 ~> merge
+
+ ClosedShape
+ })
+
+ g.run()
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+ // scalastyle:on println
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test12.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test12.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test12.scala
new file mode 100644
index 0000000..a9e8b08
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test12.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.stream.{ClosedShape, UniformFanInShape}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.{Await, Future}
+
+/**
+ * Partial source, sink example
+ */
+object Test12 extends AkkaApp with ArgumentsParser{
+ // scalastyle:off println
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ import akka.actor.ActorSystem
+ import akka.stream.scaladsl._
+
+ import scala.concurrent.duration._
+
+ implicit val system = ActorSystem("Test12", akkaConfig)
+// implicit val materializer = ActorMaterializer(
+// ActorMaterializerSettings(system).withAutoFusing(false)
+// )
+ implicit val materializer = GearpumpMaterializer()
+ implicit val ec = system.dispatcher
+
+ val pickMaxOfThree = GraphDSL.create() { implicit b =>
+ import GraphDSL.Implicits._
+
+ val zip1 = b.add(ZipWith[Int, Int, Int](math.max))
+ val zip2 = b.add(ZipWith[Int, Int, Int](math.max))
+
+ zip1.out ~> zip2.in0
+
+ UniformFanInShape(zip2.out, zip1.in0, zip1.in1, zip2.in1)
+ }
+
+ val resultSink = Sink.head[Int]
+
+ val g = RunnableGraph.fromGraph(GraphDSL.create(resultSink) { implicit b =>
+ sink =>
+ import GraphDSL.Implicits._
+
+ // Importing the partial shape will return its shape (inlets & outlets)
+ val pm3 = b.add(pickMaxOfThree)
+
+ Source.single(1) ~> pm3.in(0)
+ Source.single(2) ~> pm3.in(1)
+ Source.single(3) ~> pm3.in(2)
+
+ pm3.out ~> sink.in
+
+ ClosedShape
+ })
+
+ val max: Future[Int] = g.run()
+ max.map(x => println(s"maximum of three numbers : $x"))
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+ // scalastyle:on println
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test13.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test13.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test13.scala
new file mode 100644
index 0000000..984c861
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test13.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import java.time._
+
+import akka.actor.ActorSystem
+import akka.stream.scaladsl.Source
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.scaladsl.Implicits._
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.collection.mutable
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import scala.util.Random
+
+/**
+ * GroupBy example
+ */
+
+/*
+// Original example
+val f = Source
+ .tick(0.seconds, 1.second, "")
+ .map { _ =>
+ val now = System.currentTimeMillis()
+ val delay = random.nextInt(8)
+ MyEvent(now - delay * 1000L)
+ }
+ .statefulMapConcat { () =>
+ val generator = new CommandGenerator()
+ ev => generator.forEvent(ev)
+ }
+ .groupBy(64, command => command.w)
+ .takeWhile(!_.isInstanceOf[CloseWindow])
+ .fold(AggregateEventData((0L, 0L), 0))({
+ case (agg, OpenWindow(window)) => agg.copy(w = window)
+ // always filtered out by takeWhile
+ case (agg, CloseWindow(_)) => agg
+ case (agg, AddToWindow(ev, _)) => agg.copy(eventCount = agg.eventCount + 1)
+ })
+ .async
+ .mergeSubstreams
+ .runForeach { agg =>
+ println(agg.toString)
+ }
+ */
+object Test13 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+
+ implicit val system = ActorSystem("Test13", akkaConfig)
+ implicit val materializer = GearpumpMaterializer()
+
+ val random = new Random()
+
+ val result = Source
+ .tick(0.seconds, 1.second, "tick data")
+ .map { _ =>
+ val now = System.currentTimeMillis()
+ val delay = random.nextInt(8)
+ MyEvent(now - delay * 1000L)
+ }
+ .statefulMapConcat { () =>
+ val generator = new CommandGenerator()
+ ev => generator.forEvent(ev)
+ }
+ .groupBy2(command => command.w)
+ .takeWhile(!_.isInstanceOf[CloseWindow])
+ .fold(AggregateEventData((0L, 0L), 0))({
+ case (agg, OpenWindow(window)) => agg.copy(w = window)
+ // always filtered out by takeWhile
+ case (agg, CloseWindow(_)) => agg
+ case (agg, AddToWindow(ev, _)) => agg.copy(eventCount = agg.eventCount + 1)
+ })
+ .runForeach(agg =>
+ println(agg.toString)
+ )
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+
+ case class MyEvent(timestamp: Long)
+
+ type Window = (Long, Long)
+
+ object Window {
+ val WindowLength = 10.seconds.toMillis
+ val WindowStep = 1.second.toMillis
+ val WindowsPerEvent = (WindowLength / WindowStep).toInt
+
+ def windowsFor(ts: Long): Set[Window] = {
+ val firstWindowStart = ts - ts % WindowStep - WindowLength + WindowStep
+ (for (i <- 0 until WindowsPerEvent) yield
+ (firstWindowStart + i * WindowStep,
+ firstWindowStart + i * WindowStep + WindowLength)
+ ).toSet
+ }
+ }
+
+ sealed trait WindowCommand {
+ def w: Window
+ }
+
+ case class OpenWindow(w: Window) extends WindowCommand
+
+ case class CloseWindow(w: Window) extends WindowCommand
+
+ case class AddToWindow(ev: MyEvent, w: Window) extends WindowCommand
+
+ class CommandGenerator {
+ private val MaxDelay = 5.seconds.toMillis
+ private var watermark = 0L
+ private val openWindows = mutable.Set[Window]()
+
+ def forEvent(ev: MyEvent): List[WindowCommand] = {
+ watermark = math.max(watermark, ev.timestamp - MaxDelay)
+ if (ev.timestamp < watermark) {
+ println(s"Dropping event with timestamp: ${tsToString(ev.timestamp)}")
+ Nil
+ } else {
+ val eventWindows = Window.windowsFor(ev.timestamp)
+
+ val closeCommands = openWindows.flatMap { ow =>
+ if (!eventWindows.contains(ow) && ow._2 < watermark) {
+ openWindows.remove(ow)
+ Some(CloseWindow(ow))
+ } else None
+ }
+
+ val openCommands = eventWindows.flatMap { w =>
+ if (!openWindows.contains(w)) {
+ openWindows.add(w)
+ Some(OpenWindow(w))
+ } else None
+ }
+
+ val addCommands = eventWindows.map(w => AddToWindow(ev, w))
+
+ openCommands.toList ++ closeCommands.toList ++ addCommands.toList
+ }
+ }
+ }
+
+ case class AggregateEventData(w: Window, eventCount: Int) {
+ override def toString: String =
+ s"Between ${tsToString(w._1)} and ${tsToString(w._2)}, there were $eventCount events."
+ }
+
+ def tsToString(ts: Long): String = OffsetDateTime
+ .ofInstant(Instant.ofEpochMilli(ts), ZoneId.systemDefault())
+ .toLocalTime
+ .toString
+ // scalastyle:on println
+
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test14.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test14.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test14.scala
new file mode 100644
index 0000000..0542f43
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test14.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import java.io.File
+
+import akka.NotUsed
+import akka.actor.ActorSystem
+import akka.stream._
+import akka.stream.scaladsl._
+import akka.util.ByteString
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent._
+import scala.concurrent.duration._
+
+object Test14 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ implicit val system = ActorSystem("Test14", akkaConf)
+ implicit val materializer = GearpumpMaterializer()
+
+ def lineSink(filename: String): Sink[String, Future[IOResult]] = {
+ Flow[String]
+ .alsoTo(Sink.foreach(s => println(s"$filename: $s")))
+ .map(s => ByteString(s + "\n"))
+ .toMat(FileIO.toPath(new File(filename).toPath))(Keep.right)
+ }
+
+ val source: Source[Int, NotUsed] = Source(1 to 100)
+ val factorials: Source[BigInt, NotUsed] = source.scan(BigInt(1))((acc, next) => acc * next)
+ val sink1 = lineSink("factorial1.txt")
+ val sink2 = lineSink("factorial2.txt")
+ val slowSink2 = Flow[String].via(
+ Flow[String].throttle(1, 1.second, 1, ThrottleMode.shaping)
+ ).toMat(sink2)(Keep.right)
+ val bufferedSink2 = Flow[String].buffer(50, OverflowStrategy.backpressure).via(
+ Flow[String].throttle(1, 1.second, 1, ThrottleMode.shaping)
+ ).toMat(sink2)(Keep.right)
+
+ val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
+ import GraphDSL.Implicits._
+ val bcast = b.add(Broadcast[String](2))
+ factorials.map(_.toString) ~> bcast.in
+ bcast.out(0) ~> sink1
+ bcast.out(1) ~> bufferedSink2
+ ClosedShape
+ })
+
+ g.run()
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+ // scalastyle:on println
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test15.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test15.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test15.scala
new file mode 100644
index 0000000..c2f8d5f
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test15.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.ActorSystem
+import akka.stream._
+import akka.stream.scaladsl.{Balance, Broadcast, Flow, GraphDSL, Merge, RunnableGraph, Sink, Source}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+
+object Test15 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
+ )
+
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ val config = parse(args)
+ implicit val system = ActorSystem("Test15", akkaConf)
+ implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
+ case true =>
+ GearpumpMaterializer()
+ case false =>
+ ActorMaterializer(
+ ActorMaterializerSettings(system).withAutoFusing(false)
+ )
+ }
+ import akka.stream.scaladsl.GraphDSL.Implicits._
+ RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
+ val A = builder.add(Source.single(0)).out
+ val B = builder.add(Broadcast[Int](2))
+ val C = builder.add(Merge[Int](2).named("C"))
+ val D = builder.add(Flow[Int].map(_ + 1).named("D"))
+ val E = builder.add(Balance[Int](2).named("E"))
+ val F = builder.add(Merge[Int](2).named("F"))
+ val G = builder.add(Sink.foreach(println).named("G")).in
+
+ C <~ F
+ A ~> B ~> C ~> F
+ B ~> D ~> E ~> F
+ E ~> G
+
+ ClosedShape
+ }).run()
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+ // scalastyle:on println
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test16.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test16.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test16.scala
new file mode 100644
index 0000000..eb0b5c7
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test16.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.{Actor, ActorSystem, Props}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.scaladsl.{GearSink, GearSource}
+import akka.stream.scaladsl.Sink
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.streaming.dsl.{CollectionDataSource, LoggerSink}
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+/**
+ * All remote
+ */
+object Test16 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ implicit val system = ActorSystem("Test16", akkaConf)
+ implicit val materializer = GearpumpMaterializer()
+
+ val sink = GearSink.to(new LoggerSink[String])
+ val sourceData = new CollectionDataSource(
+ List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky"))
+ val source = GearSource.from[String](sourceData)
+ source.filter(_.startsWith("red")).map("I want to order item: " + _).runWith(sink)
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+ // scalastyle:on println
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test2.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test2.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test2.scala
new file mode 100644
index 0000000..21f1b8c
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test2.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.stream.scaladsl._
+import akka.stream.{ActorMaterializer, ClosedShape}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.scaladsl.{GearSink, GearSource}
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+/**
+ *
+ * This tests how different Materializers can be used together in an explicit way.
+ *
+ */
+object Test2 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ val config = parse(args)
+ implicit val system = ActorSystem("Test2", akkaConf)
+ val gearpumpMaterializer = GearpumpMaterializer()
+
+ val echo = system.actorOf(Props(new Echo()))
+ val source = GearSource.bridge[String, String]
+ val sink = GearSink.bridge[String, String]
+
+ val flow = Flow[String].filter(_.startsWith("red")).map("I want to order item: " + _)
+ val (entry, exit) = flow.runWith(source, sink)(gearpumpMaterializer)
+
+ val actorMaterializer = ActorMaterializer()
+
+ val externalSource = Source(
+ List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky")
+ )
+ val externalSink = Sink.actorRef(echo, "COMPLETE")
+
+ RunnableGraph.fromGraph(
+ GraphDSL.create() { implicit b =>
+ import GraphDSL.Implicits._
+ externalSource ~> Sink.fromSubscriber(entry)
+ Source.fromPublisher(exit) ~> externalSink
+ ClosedShape
+ }
+ ).run()(actorMaterializer)
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+
+ class Echo extends Actor {
+ def receive: Receive = {
+ case any: AnyRef =>
+ println("Confirm received: " + any)
+ }
+ }
+ // scalastyle:on println
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test3.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test3.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test3.scala
new file mode 100644
index 0000000..0a51078
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test3.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.scaladsl.GearSource
+import akka.stream.scaladsl.Sink
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.streaming.dsl.CollectionDataSource
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+/**
+ * read from remote and write to local
+ */
+object Test3 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
+ )
+
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ val config = parse(args)
+ implicit val system = ActorSystem("Test3", akkaConf)
+ implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
+ case true =>
+ GearpumpMaterializer()
+ case false =>
+ ActorMaterializer(
+ ActorMaterializerSettings(system).withAutoFusing(false)
+ )
+ }
+ val echo = system.actorOf(Props(new Echo()))
+ val sink = Sink.actorRef(echo, "COMPLETE")
+ val sourceData = new CollectionDataSource(
+ List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky"))
+ val source = GearSource.from[String](sourceData)
+ source.filter(_.startsWith("red")).map("I want to order item: " + _).runWith(sink)
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+
+ class Echo extends Actor {
+ def receive: Receive = {
+ case any: AnyRef =>
+ println("Confirm received: " + any)
+ }
+ }
+ // scalastyle:on println
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test4.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test4.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test4.scala
new file mode 100644
index 0000000..3cb69ce
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test4.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.ActorSystem
+import akka.stream.scaladsl.Source
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.scaladsl.GearSink
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.streaming.dsl.LoggerSink
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+/**
+ * read from local and write to remote
+ */
+object Test4 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ implicit val system = ActorSystem("Test4", akkaConf)
+ implicit val materializer = GearpumpMaterializer()
+
+ Source(
+ List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky")
+ ).filter(_.startsWith("red")).
+ map("I want to order item: " + _).
+ runWith(GearSink.to(new LoggerSink[String]))
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+ // scalastyle:on println
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test5.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test5.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test5.scala
new file mode 100644
index 0000000..72e21c7
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test5.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.stream.ClosedShape
+import akka.stream.scaladsl._
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+/**
+ * test fanout
+ */
+object Test5 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ implicit val system = ActorSystem("Test5", akkaConf)
+ implicit val materializer = GearpumpMaterializer()
+
+ val echo = system.actorOf(Props(new Echo()))
+ val source = Source(List(("male", "24"), ("female", "23")))
+ val sink = Sink.actorRef(echo, "COMPLETE")
+
+ RunnableGraph.fromGraph(
+ GraphDSL.create() { implicit b =>
+ import GraphDSL.Implicits._
+ val unzip = b.add(Unzip[String, String]())
+ val sink1 = Sink.actorRef(echo, "COMPLETE")
+ val sink2 = Sink.actorRef(echo, "COMPLETE")
+ source ~> unzip.in
+ unzip.out0 ~> sink1
+ unzip.out1 ~> sink1
+ ClosedShape
+ }
+ ).run()
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+
+ class Echo extends Actor {
+ def receive: Receive = {
+ case any: AnyRef =>
+ println("Confirm received: " + any)
+ }
+ }
+ // scalastyle:on println
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test6.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test6.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test6.scala
new file mode 100644
index 0000000..6f54933
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test6.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.stream.scaladsl.Sink
+import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.scaladsl.GearSource
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.streaming.dsl.CollectionDataSource
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+
+/**
+ * WordCount example
+ * Test GroupBy2 (groupBy which uses SubFlow is not implemented yet)
+ */
+
+import org.apache.gearpump.akkastream.scaladsl.Implicits._
+
+object Test6 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
+ )
+
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ val config = parse(args)
+ implicit val system = ActorSystem("Test6", akkaConf)
+ implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
+ case true =>
+ GearpumpMaterializer()
+ case false =>
+ ActorMaterializer(
+ ActorMaterializerSettings(system).withAutoFusing(false)
+ )
+ }
+ val echo = system.actorOf(Props(Echo()))
+ val sink = Sink.actorRef(echo, "COMPLETE")
+ val sourceData = new CollectionDataSource(
+ List(
+ "this is a good start",
+ "this is a good time",
+ "time to start",
+ "congratulations",
+ "green plant",
+ "blue sky")
+ )
+ val source = GearSource.from[String](sourceData)
+ source.mapConcat({line =>
+ line.split(" ").toList
+ }).groupBy2(x => x)
+ .map(word => (word, 1))
+ .reduce({(a, b) =>
+ (a._1, a._2 + b._2)
+ })
+ .log("word-count")
+ .runWith(sink)
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+
+ case class Echo() extends Actor {
+ def receive: Receive = {
+ case any: AnyRef =>
+ println("Confirm received: " + any)
+ }
+ }
+ // scalastyle:on println
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test7.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test7.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test7.scala
new file mode 100644
index 0000000..be91610
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test7.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.ActorSystem
+import akka.stream.scaladsl.{Broadcast, Merge, Sink, Source}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+
+/**
+ * This is a simplified API you can use to combine sources and sinks
+ * with junctions like: Broadcast[T], Balance[T], Merge[In] and Concat[A]
+ * without the need for using the Graph DSL
+ */
+
+object Test7 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ implicit val system = ActorSystem("Test7", akkaConf)
+ implicit val materializer = GearpumpMaterializer()
+ implicit val ec = system.dispatcher
+
+ val sourceA = Source(List(1))
+ val sourceB = Source(List(2))
+ val mergedSource = Source.combine(sourceA, sourceB)(Merge(_))
+
+ val sinkA = Sink.foreach[Int](x => println(s"In SinkA : $x"))
+ val sinkB = Sink.foreach[Int](x => println(s"In SinkB : $x"))
+ val sink = Sink.combine(sinkA, sinkB)(Broadcast[Int](_))
+ mergedSource.runWith(sink)
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+ // scalastyle:on println
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala
new file mode 100644
index 0000000..434aa33
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.NotUsed
+import akka.actor.ActorSystem
+import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Materializer}
+import akka.stream.scaladsl._
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+
+/**
+ * Stream example to find sum of elements
+ */
+object Test8 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
+ )
+
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ val config = parse(args)
+ implicit val system = ActorSystem("Test8", akkaConf)
+ implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
+ case true =>
+ GearpumpMaterializer()
+ case false =>
+ ActorMaterializer(
+ ActorMaterializerSettings(system).withAutoFusing(false)
+ )
+ }
+ implicit val ec = system.dispatcher
+
+ // Source gives 1 to 100 elements
+ val source: Source[Int, NotUsed] = Source(Stream.from(1).take(100))
+ val sink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _)
+
+ val result: Future[Int] = source.runWith(sink)
+ result.map(sum => {
+ println(s"Sum of stream elements => $sum")
+ })
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+ // scalastyle:on println
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala
new file mode 100644
index 0000000..63f9e2d
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.NotUsed
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.stream.{ActorMaterializer, ActorMaterializerSettings, ClosedShape}
+import akka.stream.scaladsl._
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+
+/**
+ * Stream example showing Broadcast
+ */
+object Test9 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
+ )
+
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ val config = parse(args)
+ implicit val system = ActorSystem("Test9", akkaConf)
+ implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
+ case true =>
+ GearpumpMaterializer()
+ case false =>
+ ActorMaterializer(
+ ActorMaterializerSettings(system).withAutoFusing(false)
+ )
+ }
+ implicit val ec = system.dispatcher
+
+ val sinkActor = system.actorOf(Props(new SinkActor()))
+ val source = Source((1 to 5))
+ val sink = Sink.actorRef(sinkActor, "COMPLETE")
+ val flowA: Flow[Int, Int, NotUsed] = Flow[Int].map {
+ x => println(s"processing broadcasted element : $x in flowA"); x
+ }
+ val flowB: Flow[Int, Int, NotUsed] = Flow[Int].map {
+ x => println(s"processing broadcasted element : $x in flowB"); x
+ }
+
+ val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
+ import GraphDSL.Implicits._
+ val broadcast = b.add(Broadcast[Int](2))
+ val merge = b.add(Merge[Int](2))
+ source ~> broadcast
+ broadcast ~> flowA ~> merge
+ broadcast ~> flowB ~> merge
+ merge ~> sink
+ ClosedShape
+ })
+
+ graph.run()
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+
+ class SinkActor extends Actor {
+ def receive: Receive = {
+ case any: AnyRef =>
+ println("Confirm received: " + any)
+ }
+ }
+ // scalastyle:on println
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala
new file mode 100644
index 0000000..7e2211d
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import java.io.{File, FileInputStream}
+import java.util.zip.GZIPInputStream
+
+import akka.NotUsed
+import akka.actor.ActorSystem
+import akka.stream.scaladsl._
+import akka.stream.{ClosedShape, IOResult}
+import akka.util.ByteString
+import org.apache.gearpump.akkastream.graph.GraphPartitioner
+import org.apache.gearpump.akkastream.{GearAttributes, GearpumpMaterializer}
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.util.AkkaApp
+import org.json4s.JsonAST.JString
+import org.json4s.jackson.JsonMethods
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.util.{Failure, Success, Try}
+
+/**
+ * this example is ported from http://engineering.intenthq.com/2015/06/wikidata-akka-streams/
+ * which showcases running Akka Streams DSL across JVMs on Gearpump
+ *
+ * Usage: output/target/pack/bin/gear app
+ * -jar experiments/akkastream/target/scala_2.11/akkastream-${VERSION}-SNAPSHOT-assembly.jar
+ * -input wikidata-${DATE}-all.json.gz -languages en,de
+ *
+ * (Note: Wikipedia data can be downloaded from https://dumps.wikimedia.org/wikidatawiki/entities/)
+ *
+ */
+object WikipediaApp extends ArgumentsParser with AkkaApp {
+
+ case class WikidataElement(id: String, sites: Map[String, String])
+
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ "input" -> CLIOption[String]("<Wikidata JSON dump>", required = true),
+ "languages" -> CLIOption[String]("<languages to take into account>", required = true)
+ )
+
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ val parsed = parse(args)
+ val input = new File(parsed.getString("input"))
+ val langs = parsed.getString("languages").split(",")
+
+ implicit val system = ActorSystem("WikipediaApp", akkaConf)
+ implicit val materializer =
+ GearpumpMaterializer(GraphPartitioner.TagAttributeStrategy)
+ import system.dispatcher
+
+ val elements = source(input).via(parseJson(langs))
+
+ val g = RunnableGraph.fromGraph(
+ GraphDSL.create(count) { implicit b =>
+ sinkCount => {
+ import GraphDSL.Implicits._
+ val broadcast = b.add(Broadcast[WikidataElement](2))
+ elements ~> broadcast ~> logEveryNSink(1000)
+ broadcast ~> checkSameTitles(langs.toSet) ~> sinkCount
+ ClosedShape
+ }
+ }
+ )
+
+ g.run().onComplete { x =>
+ x match {
+ case Success((t, f)) => printResults(t, f)
+ // scalastyle:off println
+ case Failure(tr) => println("Something went wrong")
+ // scalastyle:on println
+ }
+ }
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+
+ def source(file: File): Source[String, Future[IOResult]] = {
+ val compressed = new GZIPInputStream(new FileInputStream(file), 65536)
+ StreamConverters.fromInputStream(() => compressed)
+ .via(Framing.delimiter(ByteString("\n"), Int.MaxValue))
+ .map(x => x.decodeString("utf-8"))
+ }
+
+ def parseJson(langs: Seq[String])(implicit ec: ExecutionContext):
+ Flow[String, WikidataElement, NotUsed] =
+ Flow[String].mapAsyncUnordered(8)(line => Future(parseItem(langs, line))).collect({
+ case Some(v) => v
+ })
+
+ def parseItem(langs: Seq[String], line: String): Option[WikidataElement] = {
+ Try(JsonMethods.parse(line)).toOption.flatMap { json =>
+ json \ "id" match {
+ case JString(itemId) =>
+
+ val sites: Seq[(String, String)] = for {
+ lang <- langs
+ JString(title) <- json \ "sitelinks" \ s"${lang}wiki" \ "title"
+ } yield lang -> title
+
+ if(sites.isEmpty) None
+ else Some(WikidataElement(id = itemId, sites = sites.toMap))
+
+ case _ => None
+ }
+ }
+ }
+
+ def logEveryNSink[T](n: Int): Sink[T, Future[Int]] = Sink.fold(0) { (x, y: T) =>
+ if (x % n == 0) {
+ // scalastyle:off println
+ println(s"Processing element $x: $y")
+ // scalastyle:on println
+ }
+ x + 1
+ }
+
+ def checkSameTitles(langs: Set[String]):
+ Flow[WikidataElement, Boolean, NotUsed] = Flow[WikidataElement]
+ .filter(_.sites.keySet == langs)
+ .map { x =>
+ val titles = x.sites.values
+ titles.forall( _ == titles.head)
+ }.withAttributes(GearAttributes.remote)
+
+ def count: Sink[Boolean, Future[(Int, Int)]] = Sink.fold((0, 0)) {
+ case ((t, f), true) => (t + 1, f)
+ case ((t, f), false) => (t, f + 1)
+ }
+
+ def printResults(t: Int, f: Int): Unit = {
+ val message = s"""
+ | Number of items with the same title: $t
+ | Number of items with the different title: $f
+ | Ratios: ${t.toDouble / (t + f)} / ${f.toDouble / (t + f)}
+ """.stripMargin
+ // scalastyle:off println
+ println(message)
+ // scalastyle:on println
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala
new file mode 100644
index 0000000..c1e95bb
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.graph
+
+import akka.stream.{Shape, SinkShape, SourceShape}
+import org.apache.gearpump.akkastream.GearAttributes
+import org.apache.gearpump.akkastream.GearAttributes.{Local, Location, Remote}
+import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
+import org.apache.gearpump.akkastream.graph.GraphPartitioner.Strategy
+import org.apache.gearpump.akkastream.module._
+import akka.stream.impl.StreamLayout.Module
+import akka.stream.impl.fusing.GraphStageModule
+import akka.stream.impl.fusing.GraphStages.{MaterializedValueSource, SimpleLinearGraphStage, SingleSource}
+import akka.stream.impl.{SinkModule, SourceModule}
+import org.apache.gearpump.util.Graph
+
+/**
+ *
+ * GraphPartitioner is used to decide which part will be rendered locally
+ * and which part should be rendered remotely.
+ *
+ * We will cut the graph based on the [[Strategy]] provided.
+ *
+ * For example, for the following graph, we can cut the graph to
+ * two parts, each part will be a Sub Graph. The top SubGraph
+ * can be materialized remotely. The bottom part can be materialized
+ * locally.
+ *
+ * AtomicModule2 -> AtomicModule4
+ * /| \
+ * / \
+ * -----------cut line -------------cut line ----------
+ * / \
+ * / \|
+ * AtomicModule1 AtomicModule5
+ * \ /|
+ * \ /
+ * \| /
+ * AtomicModule3
+ *
+ * @see [[akka.stream.impl.MaterializerSession]] for more information of how Graph is organized.
+ *
+ */
+class GraphPartitioner(strategy: Strategy) {
+ def partition(moduleGraph: Graph[Module, Edge]): List[SubGraph] = {
+ val graph = removeDummyModule(moduleGraph)
+ val tags = tag(graph, strategy)
+ doPartition(graph, tags)
+ }
+
+ private def doPartition(graph: Graph[Module, Edge], tags: Map[Module, Location]):
+ List[SubGraph] = {
+ val local = Graph.empty[Module, Edge]
+ val remote = Graph.empty[Module, Edge]
+
+ graph.vertices.foreach{ module =>
+ if (tags(module) == Local) {
+ local.addVertex(module)
+ } else {
+ remote.addVertex(module)
+ }
+ }
+
+ graph.edges.foreach{ nodeEdgeNode =>
+ val (node1, edge, node2) = nodeEdgeNode
+ (tags(node1), tags(node2)) match {
+ case (Local, Local) =>
+ local.addEdge(nodeEdgeNode)
+ case (Remote, Remote) =>
+ remote.addEdge(nodeEdgeNode)
+ case (Local, Remote) =>
+ node2 match {
+ case bridge: BridgeModule[_, _, _] =>
+ local.addEdge(node1, edge, node2)
+ case _ =>
+ // create a bridge module in between
+ val bridge = new SourceBridgeModule[AnyRef, AnyRef]()
+ val remoteEdge = Edge(bridge.outPort, edge.to)
+ remote.addEdge(bridge, remoteEdge, node2)
+ val localEdge = Edge(edge.from, bridge.inPort)
+ local.addEdge(node1, localEdge, bridge)
+ }
+ case (Remote, Local) =>
+ node1 match {
+ case bridge: BridgeModule[_, _, _] =>
+ local.addEdge(node1, edge, node2)
+ case _ =>
+ // create a bridge module in between
+ val bridge = new SinkBridgeModule[AnyRef, AnyRef]()
+ val remoteEdge = Edge(edge.from, bridge.inPort)
+ remote.addEdge(node1, remoteEdge, bridge)
+ val localEdge = Edge(bridge.outPort, edge.to)
+ local.addEdge(bridge, localEdge, node2)
+ }
+ }
+ }
+
+ List(new RemoteGraph(remote), new LocalGraph(local))
+ }
+
+ private def tag(graph: Graph[Module, Edge], strategy: Strategy): Map[Module, Location] = {
+ graph.vertices.map{vertex =>
+ vertex -> strategy.apply(vertex)
+ }.toMap
+ }
+
+ private def removeDummyModule(inputGraph: Graph[Module, Edge]): Graph[Module, Edge] = {
+ val graph = inputGraph.copy
+ val dummies = graph.vertices.filter {module =>
+ module match {
+ case dummy: DummyModule =>
+ true
+ case _ =>
+ false
+ }
+ }
+ dummies.foreach(module => graph.removeVertex(module))
+ graph
+ }
+}
+
+object GraphPartitioner {
+
+ type Strategy = PartialFunction[Module, Location]
+
+ val BaseStrategy: Strategy = {
+ case source: BridgeModule[_, _, _] =>
+ Remote
+ case task: GearpumpTaskModule =>
+ Remote
+ case groupBy: GroupByModule[_, _] =>
+ // TODO: groupBy is not supported by local materializer
+ Remote
+ case source: SourceModule[_, _] =>
+ Local
+ case sink: SinkModule[_, _] =>
+ Local
+ case remaining: Module =>
+ remaining.shape match {
+ case sourceShape: SourceShape[_] =>
+ Local
+ case sinkShape: SinkShape[_] =>
+ Local
+ case otherShapes: Shape =>
+ Remote
+ }
+ }
+
+ val AllRemoteStrategy: Strategy = BaseStrategy orElse {
+ case graphStageModule: GraphStageModule =>
+ graphStageModule.stage match {
+ case matValueSource: MaterializedValueSource[_] =>
+ Local
+ case singleSource: SingleSource[_] =>
+ Local
+ case _ =>
+ Remote
+ }
+ case _ =>
+ Remote
+ }
+
+ /**
+ * Will decide whether to render a module locally or remotely
+ * based on Attribute settings.
+ *
+ */
+ val TagAttributeStrategy: Strategy = BaseStrategy orElse {
+ case module =>
+ GearAttributes.location(module.attributes)
+ }
+
+ val AllLocalStrategy: Strategy = BaseStrategy orElse {
+ case graphStageModule: GraphStageModule =>
+ // TODO kasravi review
+ graphStageModule.stage match {
+ case matValueSource: MaterializedValueSource[_] =>
+ Local
+ case _ =>
+ Local
+ }
+ case _ =>
+ Local
+ }
+
+ def apply(strategy: Strategy): GraphPartitioner = {
+ new GraphPartitioner(strategy)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala
new file mode 100644
index 0000000..c03fce2
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.graph
+
+import akka.actor.ActorSystem
+import akka.stream.impl.Stages.DefaultAttributes
+import akka.stream.impl.StreamLayout.Module
+import akka.stream.impl.{PublisherSource, SubscriberSink}
+import akka.stream.{SinkShape, SourceShape}
+import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
+import org.apache.gearpump.akkastream.materializer.LocalMaterializerImpl
+import org.apache.gearpump.akkastream.module.{SinkBridgeModule, SourceBridgeModule}
+import org.apache.gearpump.util.Graph
+import org.reactivestreams.{Publisher, Subscriber}
+
+/**
+ *
+ * [[LocalGraph]] is a [[SubGraph]] of the application DSL Graph, which only
+ * contain module that can be materialized in local JVM.
+ *
+ * @param graph Graph[Module, Edge]
+ */
+class LocalGraph(override val graph: Graph[Module, Edge]) extends SubGraph
+
+object LocalGraph {
+
+ /**
+ * materialize LocalGraph in local JVM
+ * @param system ActorSystem
+ */
+ class LocalGraphMaterializer(system: ActorSystem) extends SubGraphMaterializer {
+
+ // create a local materializer
+ val materializer = LocalMaterializerImpl()(system)
+
+ /**
+ *
+ * @param matValues Materialized Values for each module before materialization
+ * @return Materialized Values for each Module after the materialization.
+ */
+ override def materialize(graph: SubGraph,
+ matValues: scala.collection.mutable.Map[Module, Any]):
+ scala.collection.mutable.Map[Module, Any] = {
+ val newGraph: Graph[Module, Edge] = graph.graph.mapVertex {
+ case source: SourceBridgeModule[in, out] =>
+ val subscriber = matValues(source).asInstanceOf[Subscriber[in]]
+ val shape: SinkShape[in] = SinkShape(source.inPort)
+ new SubscriberSink(subscriber, DefaultAttributes.subscriberSink, shape)
+ case sink: SinkBridgeModule[in, out] =>
+ val publisher = matValues(sink).asInstanceOf[Publisher[out]]
+ val shape: SourceShape[out] = SourceShape(sink.outPort)
+ new PublisherSource(publisher, DefaultAttributes.publisherSource, shape)
+ case other =>
+ other
+ }
+ materializer.materialize(newGraph, matValues)
+ }
+
+ override def shutdown: Unit = {
+ materializer.shutdown()
+ }
+ }
+}
+