You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/10/11 02:33:20 UTC

[3/5] incubator-gearpump git commit: Fixes #22 support akka-streams Gearpump Materializer

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala
new file mode 100644
index 0000000..75dc95a
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream
+
+import java.util.concurrent.atomic.AtomicBoolean
+
+import akka.NotUsed
+import akka.actor.{ActorContext, ActorRef, ActorRefFactory, ActorSystem, Cancellable, ExtendedActorSystem}
+import akka.event.{Logging, LoggingAdapter}
+import akka.stream.Attributes.Attribute
+import akka.stream._
+import akka.stream.impl.Stages.SymbolicGraphStage
+import akka.stream.impl.StreamLayout._
+import akka.stream.impl._
+import akka.stream.impl.fusing.{GraphInterpreterShell, GraphStageModule}
+import akka.stream.scaladsl.ModuleExtractor
+import akka.stream.stage.GraphStage
+import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
+import org.apache.gearpump.akkastream.graph.GraphPartitioner.Strategy
+import org.apache.gearpump.akkastream.graph.LocalGraph.LocalGraphMaterializer
+import org.apache.gearpump.akkastream.graph.RemoteGraph.RemoteGraphMaterializer
+import org.apache.gearpump.akkastream.graph._
+import org.apache.gearpump.akkastream.util.MaterializedValueOps
+
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContextExecutor, Promise}
+import scala.concurrent.duration.FiniteDuration
+
+object GearpumpMaterializer {
+
+  final val Debug = true
+
+  final case class Edge(from: OutPort, to: InPort)
+
+  final case class MaterializedValueSourceAttribute(mat: MaterializedValueNode) extends Attribute
+
+  implicit def boolToAtomic(bool: Boolean): AtomicBoolean = new AtomicBoolean(bool)
+
+  def apply(strategy: Strategy)(implicit context: ActorRefFactory):
+  ExtendedActorMaterializer = {
+    val system = actorSystemOf(context)
+
+    apply(ActorMaterializerSettings(
+      system).withAutoFusing(false), strategy, useLocalCluster = false, "flow")(context)
+  }
+
+  def apply(materializerSettings: Option[ActorMaterializerSettings] = None,
+      strategy: Strategy = GraphPartitioner.AllRemoteStrategy,
+      useLocalCluster: Boolean = true,
+      namePrefix: Option[String] = None)(implicit context: ActorRefFactory):
+    ExtendedActorMaterializer = {
+    val system = actorSystemOf(context)
+
+    val settings = materializerSettings getOrElse
+      ActorMaterializerSettings(system).withAutoFusing(false)
+    apply(settings, strategy, useLocalCluster, namePrefix.getOrElse("flow"))(context)
+  }
+
+  def apply(materializerSettings: ActorMaterializerSettings,
+      strategy: Strategy,
+      useLocalCluster: Boolean,
+      namePrefix: String)(implicit context: ActorRefFactory):
+    ExtendedActorMaterializer = {
+    val system = actorSystemOf(context)
+
+    new GearpumpMaterializer(
+      system,
+      materializerSettings,
+      context.actorOf(
+        StreamSupervisor.props(materializerSettings, false).withDispatcher(
+          materializerSettings.dispatcher), StreamSupervisor.nextName()))
+  }
+
+
+  private def actorSystemOf(context: ActorRefFactory): ActorSystem = {
+    val system = context match {
+      case s: ExtendedActorSystem => s
+      case c: ActorContext => c.system
+      case null => throw new IllegalArgumentException("ActorRefFactory context must be defined")
+      case _ =>
+        throw new IllegalArgumentException(
+          s"""
+             |  context must be a ActorSystem or ActorContext, got [${context.getClass.getName}]
+          """.stripMargin
+        )
+    }
+    system
+  }
+
+}
+
+/**
+ *
+ * [[GearpumpMaterializer]] allows you to render akka-stream DSL as a Gearpump
+ * streaming application. If some module cannot be rendered remotely in Gearpump
+ * Cluster, then it will use local Actor materializer as fallback to materialize
+ * the module locally.
+ *
+ * User can customize a [[org.apache.gearpump.akkastream.graph.GraphPartitioner.Strategy]]
+ * to determine which module should be rendered
+ * remotely, and which module should be rendered locally.
+ *
+ * @see [[org.apache.gearpump.akkastream.graph.GraphPartitioner]]
+ *     to find out how we cut the runnableGraph to two parts,
+ *      and materialize them separately.
+ * @param system          ActorSystem
+ * @param strategy        Strategy
+ * @param useLocalCluster whether to use built-in in-process local cluster
+ */
+class GearpumpMaterializer(override val system: ActorSystem,
+    override val settings: ActorMaterializerSettings,
+    override val supervisor: ActorRef,
+    strategy: Strategy = GraphPartitioner.AllRemoteStrategy,
+    useLocalCluster: Boolean = true, namePrefix: Option[String] = None)
+  extends ExtendedActorMaterializer {
+
+  private val subMaterializers: Map[Class[_], SubGraphMaterializer] = Map(
+    classOf[LocalGraph] -> new LocalGraphMaterializer(system),
+    classOf[RemoteGraph] -> new RemoteGraphMaterializer(useLocalCluster, system)
+  )
+
+  override def logger: LoggingAdapter = Logging.getLogger(system, this)
+
+  override def isShutdown: Boolean = system.isTerminated
+
+  override def effectiveSettings(opAttr: Attributes): ActorMaterializerSettings = {
+    import ActorAttributes._
+    import Attributes._
+    opAttr.attributeList.foldLeft(settings) { (s, attr) =>
+      attr match {
+        case InputBuffer(initial, max) => s.withInputBuffer(initial, max)
+        case Dispatcher(dispatcher) => s.withDispatcher(dispatcher)
+        case SupervisionStrategy(decider) => s.withSupervisionStrategy(decider)
+        case _ => s
+      }
+    }
+  }
+
+  override def withNamePrefix(name: String): ExtendedActorMaterializer =
+    throw new UnsupportedOperationException()
+
+  override implicit def executionContext: ExecutionContextExecutor =
+    throw new UnsupportedOperationException()
+
+  override def schedulePeriodically(initialDelay: FiniteDuration,
+      interval: FiniteDuration,
+      task: Runnable): Cancellable =
+    system.scheduler.schedule(initialDelay, interval, task)(executionContext)
+
+  override def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable =
+    system.scheduler.scheduleOnce(delay, task)(executionContext)
+
+  override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat]): Mat = {
+    val initialAttributes = Attributes(
+      Attributes.InputBuffer(
+        settings.initialInputBufferSize,
+        settings.maxInputBufferSize
+      ) ::
+      ActorAttributes.Dispatcher(settings.dispatcher) ::
+      ActorAttributes.SupervisionStrategy(settings.supervisionDecider) ::
+      Nil)
+
+    val info = Fusing.aggressive(runnableGraph).module.info
+    import _root_.org.apache.gearpump.util.{Graph => GGraph}
+    val graph = GGraph.empty[Module, Edge]
+
+    info.allModules.foreach(module => {
+      if (module.isCopied) {
+        val original = module.asInstanceOf[CopiedModule].copyOf
+        graph.addVertex(original)
+        module.shape.outlets.zip(original.shape.outlets).foreach(out => {
+          val (cout, oout) = out
+          val cin = info.downstreams(cout)
+          val downStreamModule = info.inOwners(cin)
+          if(downStreamModule.isCopied) {
+            val downStreamOriginal = downStreamModule.asInstanceOf[CopiedModule].copyOf
+            downStreamModule.shape.inlets.zip(downStreamOriginal.shape.inlets).foreach(in => {
+              in._1 == cin match {
+                case true =>
+                  val oin = in._2
+                  graph.addEdge(original, Edge(oout, oin), downStreamOriginal)
+                case false =>
+              }
+            })
+          }
+        })
+      }
+    })
+
+    if(Debug) {
+      val iterator = graph.topologicalOrderIterator
+      while (iterator.hasNext) {
+        val module = iterator.next()
+        // scalastyle:off println
+        module match {
+          case graphStageModule: GraphStageModule =>
+            graphStageModule.stage match {
+              case symbolicGraphStage: SymbolicGraphStage[_, _, _] =>
+                val symbolicName = symbolicGraphStage.symbolicStage.getClass.getSimpleName
+                println(
+                  s"${module.getClass.getSimpleName}(${symbolicName})"
+                )
+              case graphStage: GraphStage[_] =>
+                val name = graphStage.getClass.getSimpleName
+                println(
+                  s"${module.getClass.getSimpleName}(${name})"
+                )
+              case other =>
+                println(
+                  s"${module.getClass.getSimpleName}(${other.getClass.getSimpleName})"
+                )
+            }
+          case _ =>
+            println(module.getClass.getSimpleName)
+        }
+        // scalastyle:on println
+      }
+    }
+
+    val subGraphs = GraphPartitioner(strategy).partition(graph)
+    val matValues = subGraphs.foldLeft(mutable.Map.empty[Module, Any]) { (map, subGraph) =>
+      val materializer = subMaterializers(subGraph.getClass)
+      map ++ materializer.materialize(subGraph, map)
+    }
+    val mat = matValues.flatMap(pair => {
+      val (module, any) = pair
+      any match {
+        case notUsed: NotUsed =>
+          None
+        case others =>
+          val rt = module.shape match {
+            case sink: SinkShape[_] =>
+              Some(any)
+            case _ =>
+              None
+          }
+          rt
+      }
+    }).toList
+    val matModule = subGraphs.last.graph.topologicalOrderIterator.toList.last
+    val mat2 = resolveMaterialized(matModule.materializedValueComputation, matValues)
+    val rt = Some(mat).flatMap(any => {
+      any match {
+        case promise: Promise[_] =>
+          Some(promise.future)
+        case other =>
+          Some(other)
+      }
+    })
+    rt.getOrElse(null).asInstanceOf[Mat]
+  }
+
+  override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat],
+      subflowFuser: GraphInterpreterShell => ActorRef): Mat = {
+    materialize(runnableGraph)
+  }
+
+  def shutdown: Unit = {
+    subMaterializers.values.foreach(_.shutdown)
+  }
+
+  private def resolveMaterialized(mat: MaterializedValueNode,
+      materializedValues: mutable.Map[Module, Any]): Any = mat match {
+    case Atomic(m) =>
+      materializedValues.getOrElse(m, ())
+    case Combine(f, d1, d2) =>
+      f(resolveMaterialized(d1, materializedValues), resolveMaterialized(d2, materializedValues))
+    case Transform(f, d) =>
+      f(resolveMaterialized(d, materializedValues))
+    case Ignore =>
+      ()
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala
new file mode 100644
index 0000000..871dcf8
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream
+
+import java.{util => ju}
+
+import _root_.org.apache.gearpump.util.{Graph => GGraph}
+import akka.actor.ActorSystem
+import akka.stream._
+import org.apache.gearpump.akkastream.GearpumpMaterializer.{Edge, MaterializedValueSourceAttribute}
+import akka.stream.impl.StreamLayout._
+import akka.stream.impl._
+import akka.stream.impl.fusing.GraphStages.MaterializedValueSource
+
+class GearpumpMaterializerSession(system: ActorSystem, topLevel: Module,
+    initialAttributes: Attributes, namePrefix: Option[String] = None)
+  extends MaterializerSession(topLevel, initialAttributes) {
+
+  private[this] def createFlowName(): String =
+    FlowNames(system).name.copy(namePrefix.getOrElse("flow")).next()
+
+  private val flowName = createFlowName()
+  private var nextId = 0
+
+  private def stageName(attr: Attributes): String = {
+    val name = s"$flowName-$nextId-${attr.nameOrDefault()}"
+    nextId += 1
+    name
+  }
+
+  val graph = GGraph.empty[Module, Edge]
+
+  def addEdge(publisher: (OutPort, Module), subscriber: (InPort, Module)): Unit = {
+    graph.addEdge(publisher._2, Edge(publisher._1, subscriber._1), subscriber._2)
+  }
+
+  def addVertex(module: Module): Unit = {
+    graph.addVertex(module)
+  }
+
+  override def materializeModule(module: Module, parentAttributes: Attributes): Any = {
+
+    val materializedValues: ju.Map[Module, Any] = new ju.HashMap
+    val currentAttributes = mergeAttributes(parentAttributes, module.attributes)
+
+    val materializedValueSources = List.empty[MaterializedValueSource[_]]
+
+    for (submodule <- module.subModules) {
+      submodule match {
+        case atomic: AtomicModule =>
+          materializeAtomic(atomic, currentAttributes, materializedValues)
+        case copied: CopiedModule =>
+          enterScope(copied)
+          materializedValues.put(copied, materializeModule(copied, currentAttributes))
+          exitScope(copied)
+        case composite =>
+          materializedValues.put(composite, materializeComposite(composite, currentAttributes))
+        case EmptyModule =>
+      }
+    }
+
+    val mat = resolveMaterialized(module.materializedValueComputation, materializedValues)
+
+    materializedValueSources.foreach { module =>
+      val matAttribute =
+        new MaterializedValueSourceAttribute(mat.asInstanceOf[MaterializedValueNode])
+      val copied = copyAtomicModule(module.module, parentAttributes
+        and Attributes(matAttribute))
+      // TODO
+      // assignPort(module.shape.out, (copied.shape.outlets.head, copied))
+      addVertex(copied)
+      materializedValues.put(copied, Atomic(copied))
+    }
+    mat
+
+  }
+
+  override protected def materializeComposite(composite: Module,
+      effectiveAttributes: Attributes): Any = {
+    materializeModule(composite, effectiveAttributes)
+  }
+
+  protected def materializeAtomic(atomic: AtomicModule,
+      parentAttributes: Attributes,
+    matVal: ju.Map[Module, Any]): Unit = {
+
+    val (inputs, outputs) = (atomic.shape.inlets, atomic.shape.outlets)
+    val copied = copyAtomicModule(atomic, parentAttributes)
+
+    for ((in, id) <- inputs.zipWithIndex) {
+      val inPort = inPortMapping(atomic, copied)(in)
+      // assignPort(in, (inPort, copied))
+    }
+
+    for ((out, id) <- outputs.zipWithIndex) {
+      val outPort = outPortMapping(atomic, copied)(out)
+      // TODO
+      // assignPort(out, (outPort, copied))
+    }
+
+    addVertex(copied)
+    matVal.put(atomic, Atomic(copied))
+  }
+
+  private def copyAtomicModule[T <: Module](module: T, parentAttributes: Attributes): T = {
+    val currentAttributes = mergeAttributes(parentAttributes, module.attributes)
+    module.withAttributes(currentAttributes).asInstanceOf[T]
+  }
+
+  private def outPortMapping(from: Module, to: Module): Map[OutPort, OutPort] = {
+    from.shape.outlets.iterator.zip(to.shape.outlets.iterator).toList.toMap
+  }
+
+  private def inPortMapping(from: Module, to: Module): Map[InPort, InPort] = {
+    from.shape.inlets.iterator.zip(to.shape.inlets.iterator).toList.toMap
+  }
+
+  protected def resolveMaterialized(matNode: MaterializedValueNode,
+      materializedValues: ju.Map[Module, Any]):
+    Any =
+    matNode match {
+      case Atomic(m) => materializedValues.get(m)
+      case Combine(f, d1, d2) => f(resolveMaterialized(d1, materializedValues),
+        resolveMaterialized(d2, materializedValues))
+      case Transform(f, d) => f(resolveMaterialized(d, materializedValues))
+      case Ignore => Ignore
+    }
+}
+
+object GearpumpMaterializerSession {
+  def apply(system: ActorSystem, topLevel: Module,
+      initialAttributes: Attributes, namePrefix: Option[String] = None):
+  GearpumpMaterializerSession = {
+    new GearpumpMaterializerSession(system, topLevel, initialAttributes, namePrefix)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala
new file mode 100644
index 0000000..2ce4e19
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.stream.scaladsl.{Sink, Source}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+
+/**
+ * Source and Sink are materialized locally.
+ * Remaining GraphStages are materialized remotely:
+ *  statefulMap, filter, fold, flatMap
+ */
+object Test extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    implicit val system = ActorSystem("Test", akkaConf)
+    implicit val materializer = GearpumpMaterializer()
+
+    val echo = system.actorOf(Props(new Echo()))
+    val sink = Sink.actorRef(echo, "COMPLETE")
+
+    Source(
+      List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky")
+    ).filter(_.startsWith("red")).fold("Items:") {(a, b) =>
+      a + "|" + b
+    }.map("I want to order item: " + _).runWith(sink)
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+
+  class Echo extends Actor {
+    def receive: Receive = {
+      case any: AnyRef =>
+        println("Confirm received: " + any)
+    }
+  }
+  // scalastyle:on println
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test10.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test10.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test10.scala
new file mode 100644
index 0000000..71678c3
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test10.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.NotUsed
+import akka.stream.{ClosedShape, ThrottleMode}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+ 
+/**
+ * Stream example showing Conflate, Throttle
+ */
+object Test10 extends AkkaApp with ArgumentsParser {
+
+  // scalastyle:off println
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    import akka.actor.ActorSystem
+    import akka.stream.scaladsl._
+
+    implicit val system = ActorSystem("Test10", akkaConfig)
+    implicit val materializer = GearpumpMaterializer()
+    implicit val ec = system.dispatcher
+
+    // Conflate[A] - (2 inputs, 1 output) concatenates two streams
+    // (first consumes one, then the second one)
+    def stream(x: String) = Stream.continually(x)
+
+    val sourceA = Source(stream("A"))
+    val sourceB = Source(stream("B"))
+
+    val throttler: Flow[String, String, NotUsed] =
+      Flow[String].throttle(1, 1.second, 1, ThrottleMode.Shaping)
+    val conflateFlow: Flow[String, String, NotUsed] =
+      Flow[String].conflate((x: String, y: String) => x: String)
+      ((acc: String, x: String) => s"$acc::$x")
+
+    val printFlow: Flow[(String, String), String, NotUsed] =
+      Flow[(String, String)].map {
+        x =>
+          println(s" lengths are : ${x._1.length} and ${x._2.length}  ;  ${x._1} zip ${x._2}")
+          x.toString
+      }
+
+    val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
+      import GraphDSL.Implicits._
+
+      val zipping = b.add(Zip[String, String]())
+
+      sourceA ~> throttler ~> zipping.in0
+      sourceB ~> conflateFlow ~> zipping.in1
+
+      zipping.out ~> printFlow ~> Sink.ignore
+
+      ClosedShape
+    })
+
+    graph.run()
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+  // scalastyle:on println
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test11.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test11.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test11.scala
new file mode 100644
index 0000000..b80398c
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test11.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.NotUsed
+import akka.stream.ClosedShape
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+ 
+/**
+ * Stream example showing Broadcast and Merge
+ */
+object Test11 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    import akka.actor.ActorSystem
+    import akka.stream.scaladsl._
+
+    implicit val system = ActorSystem("Test11", akkaConfig)
+    implicit val materializer = GearpumpMaterializer()
+//    implicit val materializer =
+//    ActorMaterializer(ActorMaterializerSettings(system).withAutoFusing(false))
+    implicit val ec = system.dispatcher
+
+    val g = RunnableGraph.fromGraph(GraphDSL.create() {
+      implicit builder: GraphDSL.Builder[NotUsed] =>
+
+      import GraphDSL.Implicits._
+      val in = Source(1 to 10)
+      val output: (Any) => Unit = any => {
+        val s = s"**** $any"
+        println(s)
+      }
+      val out = Sink.foreach(output)
+
+      val broadcast = builder.add(Broadcast[Int](2))
+      val merge = builder.add(Merge[Int](2))
+
+      val f1, f2, f3, f4 = Flow[Int].map(_ + 10)
+
+      in ~> f1 ~> broadcast ~> f2 ~> merge ~> f3 ~> out
+      broadcast ~> f4 ~> merge
+
+      ClosedShape
+    })
+
+    g.run()
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+  // scalastyle:on println
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test12.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test12.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test12.scala
new file mode 100644
index 0000000..a9e8b08
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test12.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.stream.{ClosedShape, UniformFanInShape}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.{Await, Future}
+ 
+/**
+ * Partial source, sink example
+ */
+object Test12 extends AkkaApp with ArgumentsParser{
+  // scalastyle:off println
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    import akka.actor.ActorSystem
+    import akka.stream.scaladsl._
+
+    import scala.concurrent.duration._
+
+    implicit val system = ActorSystem("Test12", akkaConfig)
+//    implicit val materializer = ActorMaterializer(
+//      ActorMaterializerSettings(system).withAutoFusing(false)
+//    )
+    implicit val materializer = GearpumpMaterializer()
+    implicit val ec = system.dispatcher
+
+    val pickMaxOfThree = GraphDSL.create() { implicit b =>
+      import GraphDSL.Implicits._
+
+      val zip1 = b.add(ZipWith[Int, Int, Int](math.max))
+      val zip2 = b.add(ZipWith[Int, Int, Int](math.max))
+
+      zip1.out ~> zip2.in0
+
+      UniformFanInShape(zip2.out, zip1.in0, zip1.in1, zip2.in1)
+    }
+
+    val resultSink = Sink.head[Int]
+
+    val g = RunnableGraph.fromGraph(GraphDSL.create(resultSink) { implicit b =>
+      sink =>
+        import GraphDSL.Implicits._
+
+        // Importing the partial shape will return its shape (inlets & outlets)
+        val pm3 = b.add(pickMaxOfThree)
+
+        Source.single(1) ~> pm3.in(0)
+        Source.single(2) ~> pm3.in(1)
+        Source.single(3) ~> pm3.in(2)
+
+        pm3.out ~> sink.in
+
+        ClosedShape
+    })
+
+    val max: Future[Int] = g.run()
+    max.map(x => println(s"maximum of three numbers : $x"))
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+  // scalastyle:on println
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test13.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test13.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test13.scala
new file mode 100644
index 0000000..984c861
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test13.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import java.time._
+
+import akka.actor.ActorSystem
+import akka.stream.scaladsl.Source
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.scaladsl.Implicits._
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.collection.mutable
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import scala.util.Random
+
+/**
+ * GroupBy example
+ */
+
+/*
+// Original example
+val f = Source
+  .tick(0.seconds, 1.second, "")
+  .map { _ =>
+    val now = System.currentTimeMillis()
+    val delay = random.nextInt(8)
+    MyEvent(now - delay * 1000L)
+  }
+  .statefulMapConcat { () =>
+    val generator = new CommandGenerator()
+    ev => generator.forEvent(ev)
+  }
+  .groupBy(64, command => command.w)
+  .takeWhile(!_.isInstanceOf[CloseWindow])
+  .fold(AggregateEventData((0L, 0L), 0))({
+    case (agg, OpenWindow(window)) => agg.copy(w = window)
+    // always filtered out by takeWhile
+    case (agg, CloseWindow(_)) => agg
+    case (agg, AddToWindow(ev, _)) => agg.copy(eventCount = agg.eventCount + 1)
+  })
+  .async
+  .mergeSubstreams
+  .runForeach { agg =>
+    println(agg.toString)
+  }
+ */
+object Test13 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+
+    implicit val system = ActorSystem("Test13", akkaConfig)
+    implicit val materializer = GearpumpMaterializer()
+
+    val random = new Random()
+
+    val result = Source
+      .tick(0.seconds, 1.second, "tick data")
+      .map { _ =>
+        val now = System.currentTimeMillis()
+        val delay = random.nextInt(8)
+        MyEvent(now - delay * 1000L)
+      }
+      .statefulMapConcat { () =>
+        val generator = new CommandGenerator()
+        ev => generator.forEvent(ev)
+      }
+      .groupBy2(command => command.w)
+      .takeWhile(!_.isInstanceOf[CloseWindow])
+      .fold(AggregateEventData((0L, 0L), 0))({
+        case (agg, OpenWindow(window)) => agg.copy(w = window)
+        // always filtered out by takeWhile
+        case (agg, CloseWindow(_)) => agg
+        case (agg, AddToWindow(ev, _)) => agg.copy(eventCount = agg.eventCount + 1)
+      })
+      .runForeach(agg =>
+        println(agg.toString)
+      )
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+
+  case class MyEvent(timestamp: Long)
+
+  type Window = (Long, Long)
+
+  object Window {
+    val WindowLength = 10.seconds.toMillis
+    val WindowStep = 1.second.toMillis
+    val WindowsPerEvent = (WindowLength / WindowStep).toInt
+
+    def windowsFor(ts: Long): Set[Window] = {
+      val firstWindowStart = ts - ts % WindowStep - WindowLength + WindowStep
+      (for (i <- 0 until WindowsPerEvent) yield
+        (firstWindowStart + i * WindowStep,
+          firstWindowStart + i * WindowStep + WindowLength)
+        ).toSet
+    }
+  }
+
+  sealed trait WindowCommand {
+    def w: Window
+  }
+
+  case class OpenWindow(w: Window) extends WindowCommand
+
+  case class CloseWindow(w: Window) extends WindowCommand
+
+  case class AddToWindow(ev: MyEvent, w: Window) extends WindowCommand
+
+  class CommandGenerator {
+    private val MaxDelay = 5.seconds.toMillis
+    private var watermark = 0L
+    private val openWindows = mutable.Set[Window]()
+
+    def forEvent(ev: MyEvent): List[WindowCommand] = {
+      watermark = math.max(watermark, ev.timestamp - MaxDelay)
+      if (ev.timestamp < watermark) {
+        println(s"Dropping event with timestamp: ${tsToString(ev.timestamp)}")
+        Nil
+      } else {
+        val eventWindows = Window.windowsFor(ev.timestamp)
+
+        val closeCommands = openWindows.flatMap { ow =>
+          if (!eventWindows.contains(ow) && ow._2 < watermark) {
+            openWindows.remove(ow)
+            Some(CloseWindow(ow))
+          } else None
+        }
+
+        val openCommands = eventWindows.flatMap { w =>
+          if (!openWindows.contains(w)) {
+            openWindows.add(w)
+            Some(OpenWindow(w))
+          } else None
+        }
+
+        val addCommands = eventWindows.map(w => AddToWindow(ev, w))
+
+        openCommands.toList ++ closeCommands.toList ++ addCommands.toList
+      }
+    }
+  }
+
+  case class AggregateEventData(w: Window, eventCount: Int) {
+    override def toString: String =
+      s"Between ${tsToString(w._1)} and ${tsToString(w._2)}, there were $eventCount events."
+  }
+
+  def tsToString(ts: Long): String = OffsetDateTime
+    .ofInstant(Instant.ofEpochMilli(ts), ZoneId.systemDefault())
+    .toLocalTime
+    .toString
+  // scalastyle:on println
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test14.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test14.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test14.scala
new file mode 100644
index 0000000..0542f43
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test14.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import java.io.File
+
+import akka.NotUsed
+import akka.actor.ActorSystem
+import akka.stream._
+import akka.stream.scaladsl._
+import akka.util.ByteString
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent._
+import scala.concurrent.duration._
+
+object Test14 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    implicit val system = ActorSystem("Test14", akkaConf)
+    implicit val materializer = GearpumpMaterializer()
+
+    def lineSink(filename: String): Sink[String, Future[IOResult]] = {
+      Flow[String]
+        .alsoTo(Sink.foreach(s => println(s"$filename: $s")))
+        .map(s => ByteString(s + "\n"))
+        .toMat(FileIO.toPath(new File(filename).toPath))(Keep.right)
+    }
+
+    val source: Source[Int, NotUsed] = Source(1 to 100)
+    val factorials: Source[BigInt, NotUsed] = source.scan(BigInt(1))((acc, next) => acc * next)
+    val sink1 = lineSink("factorial1.txt")
+    val sink2 = lineSink("factorial2.txt")
+    val slowSink2 = Flow[String].via(
+      Flow[String].throttle(1, 1.second, 1, ThrottleMode.shaping)
+    ).toMat(sink2)(Keep.right)
+    val bufferedSink2 = Flow[String].buffer(50, OverflowStrategy.backpressure).via(
+      Flow[String].throttle(1, 1.second, 1, ThrottleMode.shaping)
+    ).toMat(sink2)(Keep.right)
+
+    val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
+      import GraphDSL.Implicits._
+      val bcast = b.add(Broadcast[String](2))
+      factorials.map(_.toString) ~> bcast.in
+      bcast.out(0) ~> sink1
+      bcast.out(1) ~> bufferedSink2
+      ClosedShape
+    })
+
+    g.run()
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+  // scalastyle:on println
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test15.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test15.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test15.scala
new file mode 100644
index 0000000..c2f8d5f
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test15.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.ActorSystem
+import akka.stream._
+import akka.stream.scaladsl.{Balance, Broadcast, Flow, GraphDSL, Merge, RunnableGraph, Sink, Source}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+
+object Test15 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
+  )
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+    implicit val system = ActorSystem("Test15", akkaConf)
+    implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
+      case true =>
+        GearpumpMaterializer()
+      case false =>
+        ActorMaterializer(
+          ActorMaterializerSettings(system).withAutoFusing(false)
+        )
+    }
+    import akka.stream.scaladsl.GraphDSL.Implicits._
+    RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
+      val A = builder.add(Source.single(0)).out
+      val B = builder.add(Broadcast[Int](2))
+      val C = builder.add(Merge[Int](2).named("C"))
+      val D = builder.add(Flow[Int].map(_ + 1).named("D"))
+      val E = builder.add(Balance[Int](2).named("E"))
+      val F = builder.add(Merge[Int](2).named("F"))
+      val G = builder.add(Sink.foreach(println).named("G")).in
+
+      C <~ F
+      A ~> B ~> C ~> F
+      B ~> D ~> E ~> F
+      E ~> G
+
+      ClosedShape
+    }).run()
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+  // scalastyle:on println
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test16.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test16.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test16.scala
new file mode 100644
index 0000000..eb0b5c7
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test16.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.{Actor, ActorSystem, Props}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.scaladsl.{GearSink, GearSource}
+import akka.stream.scaladsl.Sink
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.streaming.dsl.{CollectionDataSource, LoggerSink}
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+/**
+ * All remote
+ */
+object Test16 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    implicit val system = ActorSystem("Test16", akkaConf)
+    implicit val materializer = GearpumpMaterializer()
+
+    val sink = GearSink.to(new LoggerSink[String])
+    val sourceData = new CollectionDataSource(
+      List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky"))
+    val source = GearSource.from[String](sourceData)
+    source.filter(_.startsWith("red")).map("I want to order item: " + _).runWith(sink)
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+  // scalastyle:on println
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test2.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test2.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test2.scala
new file mode 100644
index 0000000..21f1b8c
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test2.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.stream.scaladsl._
+import akka.stream.{ActorMaterializer, ClosedShape}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.scaladsl.{GearSink, GearSource}
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+/**
+ *
+ * This tests how different Materializers can be used together in an explicit way.
+ *
+ */
+object Test2 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+    implicit val system = ActorSystem("Test2", akkaConf)
+    val gearpumpMaterializer = GearpumpMaterializer()
+
+    val echo = system.actorOf(Props(new Echo()))
+    val source = GearSource.bridge[String, String]
+    val sink = GearSink.bridge[String, String]
+
+    val flow = Flow[String].filter(_.startsWith("red")).map("I want to order item: " + _)
+    val (entry, exit) = flow.runWith(source, sink)(gearpumpMaterializer)
+
+    val actorMaterializer = ActorMaterializer()
+
+    val externalSource = Source(
+      List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky")
+    )
+    val externalSink = Sink.actorRef(echo, "COMPLETE")
+
+    RunnableGraph.fromGraph(
+      GraphDSL.create() { implicit b =>
+        import GraphDSL.Implicits._
+        externalSource ~> Sink.fromSubscriber(entry)
+        Source.fromPublisher(exit) ~> externalSink
+        ClosedShape
+      }
+    ).run()(actorMaterializer)
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+
+  class Echo extends Actor {
+    def receive: Receive = {
+      case any: AnyRef =>
+        println("Confirm received: " + any)
+    }
+  }
+  // scalastyle:on println
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test3.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test3.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test3.scala
new file mode 100644
index 0000000..0a51078
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test3.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.scaladsl.GearSource
+import akka.stream.scaladsl.Sink
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.streaming.dsl.CollectionDataSource
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+/**
+ * read from remote and write to local
+ */
+object Test3 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
+  )
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+    implicit val system = ActorSystem("Test3", akkaConf)
+    implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
+      case true =>
+        GearpumpMaterializer()
+      case false =>
+        ActorMaterializer(
+          ActorMaterializerSettings(system).withAutoFusing(false)
+        )
+    }
+    val echo = system.actorOf(Props(new Echo()))
+    val sink = Sink.actorRef(echo, "COMPLETE")
+    val sourceData = new CollectionDataSource(
+      List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky"))
+    val source = GearSource.from[String](sourceData)
+    source.filter(_.startsWith("red")).map("I want to order item: " + _).runWith(sink)
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+
+  class Echo extends Actor {
+    def receive: Receive = {
+      case any: AnyRef =>
+        println("Confirm received: " + any)
+    }
+  }
+  // scalastyle:on println
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test4.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test4.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test4.scala
new file mode 100644
index 0000000..3cb69ce
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test4.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.ActorSystem
+import akka.stream.scaladsl.Source
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.scaladsl.GearSink
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.streaming.dsl.LoggerSink
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+/**
+ * read from local and write to remote
+ */
+object Test4 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    implicit val system = ActorSystem("Test4", akkaConf)
+    implicit val materializer = GearpumpMaterializer()
+
+    Source(
+      List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky")
+    ).filter(_.startsWith("red")).
+      map("I want to order item: " + _).
+      runWith(GearSink.to(new LoggerSink[String]))
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+  // scalastyle:on println
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test5.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test5.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test5.scala
new file mode 100644
index 0000000..72e21c7
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test5.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.stream.ClosedShape
+import akka.stream.scaladsl._
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+/**
+ * test fanout
+ */
+object Test5 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    implicit val system = ActorSystem("Test5", akkaConf)
+    implicit val materializer = GearpumpMaterializer()
+
+    val echo = system.actorOf(Props(new Echo()))
+    val source = Source(List(("male", "24"), ("female", "23")))
+    val sink = Sink.actorRef(echo, "COMPLETE")
+
+    RunnableGraph.fromGraph(
+      GraphDSL.create() { implicit b =>
+        import GraphDSL.Implicits._
+        val unzip = b.add(Unzip[String, String]())
+        val sink1 = Sink.actorRef(echo, "COMPLETE")
+        val sink2 = Sink.actorRef(echo, "COMPLETE")
+        source ~> unzip.in
+        unzip.out0 ~> sink1
+        unzip.out1 ~> sink1
+        ClosedShape
+      }
+    ).run()
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+
+  class Echo extends Actor {
+    def receive: Receive = {
+      case any: AnyRef =>
+        println("Confirm received: " + any)
+    }
+  }
+  // scalastyle:on println
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test6.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test6.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test6.scala
new file mode 100644
index 0000000..6f54933
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test6.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.stream.scaladsl.Sink
+import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.scaladsl.GearSource
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.streaming.dsl.CollectionDataSource
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+
+/**
+ *  WordCount example
+ * Test GroupBy2 (groupBy which uses SubFlow is not implemented yet)
+ */
+
+import org.apache.gearpump.akkastream.scaladsl.Implicits._
+
+object Test6 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
+  )
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+    implicit val system = ActorSystem("Test6", akkaConf)
+    implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
+      case true =>
+        GearpumpMaterializer()
+      case false =>
+        ActorMaterializer(
+          ActorMaterializerSettings(system).withAutoFusing(false)
+        )
+    }
+    val echo = system.actorOf(Props(Echo()))
+    val sink = Sink.actorRef(echo, "COMPLETE")
+    val sourceData = new CollectionDataSource(
+      List(
+        "this is a good start",
+        "this is a good time",
+        "time to start",
+        "congratulations",
+        "green plant",
+        "blue sky")
+    )
+    val source = GearSource.from[String](sourceData)
+    source.mapConcat({line =>
+      line.split(" ").toList
+    }).groupBy2(x => x)
+      .map(word => (word, 1))
+      .reduce({(a, b) =>
+        (a._1, a._2 + b._2)
+      })
+      .log("word-count")
+      .runWith(sink)
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+
+  case class Echo() extends Actor {
+    def receive: Receive = {
+      case any: AnyRef =>
+        println("Confirm received: " + any)
+    }
+  }
+  // scalastyle:on println
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test7.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test7.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test7.scala
new file mode 100644
index 0000000..be91610
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test7.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.ActorSystem
+import akka.stream.scaladsl.{Broadcast, Merge, Sink, Source}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+
+/**
+ * This is a simplified API you can use to combine sources and sinks
+ * with junctions like: Broadcast[T], Balance[T], Merge[In] and Concat[A]
+ * without the need for using the Graph DSL
+ */
+
+object Test7 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    implicit val system = ActorSystem("Test7", akkaConf)
+    implicit val materializer = GearpumpMaterializer()
+    implicit val ec = system.dispatcher
+ 
+    val sourceA = Source(List(1))
+    val sourceB = Source(List(2))
+    val mergedSource = Source.combine(sourceA, sourceB)(Merge(_))
+
+    val sinkA = Sink.foreach[Int](x => println(s"In SinkA : $x"))
+    val sinkB = Sink.foreach[Int](x => println(s"In SinkB : $x"))
+    val sink = Sink.combine(sinkA, sinkB)(Broadcast[Int](_))
+    mergedSource.runWith(sink)
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+  // scalastyle:on println
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala
new file mode 100644
index 0000000..434aa33
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.NotUsed
+import akka.actor.ActorSystem
+import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Materializer}
+import akka.stream.scaladsl._
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+ 
+/**
+ * Stream example to find sum of elements
+ */
+object Test8 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
+  )
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+    implicit val system = ActorSystem("Test8", akkaConf)
+    implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
+      case true =>
+        GearpumpMaterializer()
+      case false =>
+        ActorMaterializer(
+          ActorMaterializerSettings(system).withAutoFusing(false)
+        )
+    }
+    implicit val ec = system.dispatcher
+
+    // Source gives 1 to 100 elements
+    val source: Source[Int, NotUsed] = Source(Stream.from(1).take(100))
+    val sink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _)
+
+    val result: Future[Int] = source.runWith(sink)
+    result.map(sum => {
+      println(s"Sum of stream elements => $sum")
+    })
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+  // scalastyle:on println
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala
new file mode 100644
index 0000000..63f9e2d
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.NotUsed
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.stream.{ActorMaterializer, ActorMaterializerSettings, ClosedShape}
+import akka.stream.scaladsl._
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+ 
+/**
+ * Stream example showing Broadcast
+ */
+object Test9 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
+  )
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+    implicit val system = ActorSystem("Test9", akkaConf)
+    implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
+      case true =>
+        GearpumpMaterializer()
+      case false =>
+        ActorMaterializer(
+          ActorMaterializerSettings(system).withAutoFusing(false)
+        )
+    }
+    implicit val ec = system.dispatcher
+
+    val sinkActor = system.actorOf(Props(new SinkActor()))
+    val source = Source((1 to 5))
+    val sink = Sink.actorRef(sinkActor, "COMPLETE")
+    val flowA: Flow[Int, Int, NotUsed] = Flow[Int].map {
+      x => println(s"processing broadcasted element : $x in flowA"); x
+    }
+    val flowB: Flow[Int, Int, NotUsed] = Flow[Int].map {
+      x => println(s"processing broadcasted element : $x in flowB"); x
+    }
+
+    val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
+      import GraphDSL.Implicits._
+      val broadcast = b.add(Broadcast[Int](2))
+      val merge = b.add(Merge[Int](2))
+      source ~> broadcast
+      broadcast ~> flowA ~> merge
+      broadcast ~> flowB ~> merge
+      merge ~> sink
+      ClosedShape
+    })
+
+    graph.run()
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+
+  class SinkActor extends Actor {
+    def receive: Receive = {
+      case any: AnyRef =>
+        println("Confirm received: " + any)
+    }
+  }
+  // scalastyle:on println
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala
new file mode 100644
index 0000000..7e2211d
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import java.io.{File, FileInputStream}
+import java.util.zip.GZIPInputStream
+
+import akka.NotUsed
+import akka.actor.ActorSystem
+import akka.stream.scaladsl._
+import akka.stream.{ClosedShape, IOResult}
+import akka.util.ByteString
+import org.apache.gearpump.akkastream.graph.GraphPartitioner
+import org.apache.gearpump.akkastream.{GearAttributes, GearpumpMaterializer}
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.util.AkkaApp
+import org.json4s.JsonAST.JString
+import org.json4s.jackson.JsonMethods
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.util.{Failure, Success, Try}
+
+/**
+ * this example is ported from http://engineering.intenthq.com/2015/06/wikidata-akka-streams/
+ * which showcases running Akka Streams DSL across JVMs on Gearpump
+ *
+ * Usage: output/target/pack/bin/gear app
+ *  -jar experiments/akkastream/target/scala_2.11/akkastream-${VERSION}-SNAPSHOT-assembly.jar
+ *  -input wikidata-${DATE}-all.json.gz -languages en,de
+ *
+ * (Note: Wikipedia data can be downloaded from https://dumps.wikimedia.org/wikidatawiki/entities/)
+ *
+ */
+object WikipediaApp extends ArgumentsParser with AkkaApp {
+
+  case class WikidataElement(id: String, sites: Map[String, String])
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "input" -> CLIOption[String]("<Wikidata JSON dump>", required = true),
+    "languages" -> CLIOption[String]("<languages to take into account>", required = true)
+  )
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val parsed = parse(args)
+    val input = new File(parsed.getString("input"))
+    val langs = parsed.getString("languages").split(",")
+
+    implicit val system = ActorSystem("WikipediaApp", akkaConf)
+    implicit val materializer =
+      GearpumpMaterializer(GraphPartitioner.TagAttributeStrategy)
+    import system.dispatcher
+
+    val elements = source(input).via(parseJson(langs))
+
+    val g = RunnableGraph.fromGraph(
+      GraphDSL.create(count) { implicit b =>
+        sinkCount => {
+          import GraphDSL.Implicits._
+          val broadcast = b.add(Broadcast[WikidataElement](2))
+          elements ~> broadcast ~> logEveryNSink(1000)
+          broadcast ~> checkSameTitles(langs.toSet) ~> sinkCount
+          ClosedShape
+        }
+      }
+    )
+
+    g.run().onComplete { x =>
+      x match {
+        case Success((t, f)) => printResults(t, f)
+        // scalastyle:off println
+        case Failure(tr) => println("Something went wrong")
+        // scalastyle:on println
+      }
+    }
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+
+  def source(file: File): Source[String, Future[IOResult]] = {
+    val compressed = new GZIPInputStream(new FileInputStream(file), 65536)
+    StreamConverters.fromInputStream(() => compressed)
+      .via(Framing.delimiter(ByteString("\n"), Int.MaxValue))
+      .map(x => x.decodeString("utf-8"))
+  }
+
+  def parseJson(langs: Seq[String])(implicit ec: ExecutionContext):
+  Flow[String, WikidataElement, NotUsed] =
+    Flow[String].mapAsyncUnordered(8)(line => Future(parseItem(langs, line))).collect({
+      case Some(v) => v
+    })
+
+  def parseItem(langs: Seq[String], line: String): Option[WikidataElement] = {
+    Try(JsonMethods.parse(line)).toOption.flatMap { json =>
+      json \ "id" match {
+        case JString(itemId) =>
+
+          val sites: Seq[(String, String)] = for {
+            lang <- langs
+            JString(title) <- json \ "sitelinks" \ s"${lang}wiki" \ "title"
+          } yield lang -> title
+
+          if(sites.isEmpty) None
+          else Some(WikidataElement(id = itemId, sites = sites.toMap))
+
+        case _ => None
+      }
+    }
+  }
+
+  def logEveryNSink[T](n: Int): Sink[T, Future[Int]] = Sink.fold(0) { (x, y: T) =>
+    if (x % n == 0) {
+      // scalastyle:off println
+      println(s"Processing element $x: $y")
+      // scalastyle:on println
+    }
+    x + 1
+  }
+
+  def checkSameTitles(langs: Set[String]):
+    Flow[WikidataElement, Boolean, NotUsed] = Flow[WikidataElement]
+    .filter(_.sites.keySet == langs)
+    .map { x =>
+      val titles = x.sites.values
+      titles.forall( _ == titles.head)
+    }.withAttributes(GearAttributes.remote)
+
+  def count: Sink[Boolean, Future[(Int, Int)]] = Sink.fold((0, 0)) {
+    case ((t, f), true) => (t + 1, f)
+    case ((t, f), false) => (t, f + 1)
+  }
+
+  def printResults(t: Int, f: Int): Unit = {
+    val message = s"""
+      | Number of items with the same title: $t
+      | Number of items with the different title: $f
+      | Ratios: ${t.toDouble / (t + f)} / ${f.toDouble / (t + f)}
+      """.stripMargin
+    // scalastyle:off println
+    println(message)
+    // scalastyle:on println
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala
new file mode 100644
index 0000000..c1e95bb
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.graph
+
+import akka.stream.{Shape, SinkShape, SourceShape}
+import org.apache.gearpump.akkastream.GearAttributes
+import org.apache.gearpump.akkastream.GearAttributes.{Local, Location, Remote}
+import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
+import org.apache.gearpump.akkastream.graph.GraphPartitioner.Strategy
+import org.apache.gearpump.akkastream.module._
+import akka.stream.impl.StreamLayout.Module
+import akka.stream.impl.fusing.GraphStageModule
+import akka.stream.impl.fusing.GraphStages.{MaterializedValueSource, SimpleLinearGraphStage, SingleSource}
+import akka.stream.impl.{SinkModule, SourceModule}
+import org.apache.gearpump.util.Graph
+
+/**
+ *
+ * GraphPartitioner is used to decide which part will be rendered locally
+ * and which part should be rendered remotely.
+ *
+ * We will cut the graph based on the [[Strategy]] provided.
+ *
+ * For example, for the following graph, we can cut the graph to
+ * two parts, each part will be a Sub Graph. The top SubGraph
+ * can be materialized remotely. The bottom part can be materialized
+ * locally.
+ *
+ *        AtomicModule2 -> AtomicModule4
+ *           /|                 \
+ *          /                    \
+ * -----------cut line -------------cut line ----------
+ *       /                        \
+ *     /                           \|
+ * AtomicModule1           AtomicModule5
+ *     \                   /|
+ *      \                 /
+ *       \|              /
+ *         AtomicModule3
+ *
+ * @see [[akka.stream.impl.MaterializerSession]] for more information of how Graph is organized.
+ *
+ */
+class GraphPartitioner(strategy: Strategy) {
+  def partition(moduleGraph: Graph[Module, Edge]): List[SubGraph] = {
+    val graph = removeDummyModule(moduleGraph)
+    val tags = tag(graph, strategy)
+    doPartition(graph, tags)
+  }
+
+  private def doPartition(graph: Graph[Module, Edge], tags: Map[Module, Location]):
+  List[SubGraph] = {
+    val local = Graph.empty[Module, Edge]
+    val remote = Graph.empty[Module, Edge]
+
+    graph.vertices.foreach{ module =>
+      if (tags(module) == Local) {
+        local.addVertex(module)
+      } else {
+        remote.addVertex(module)
+      }
+    }
+
+    graph.edges.foreach{ nodeEdgeNode =>
+      val (node1, edge, node2) = nodeEdgeNode
+      (tags(node1), tags(node2)) match {
+        case (Local, Local) =>
+          local.addEdge(nodeEdgeNode)
+        case (Remote, Remote) =>
+          remote.addEdge(nodeEdgeNode)
+        case (Local, Remote) =>
+          node2 match {
+            case bridge: BridgeModule[_, _, _] =>
+              local.addEdge(node1, edge, node2)
+            case _ =>
+              // create a bridge module in between
+              val bridge = new SourceBridgeModule[AnyRef, AnyRef]()
+              val remoteEdge = Edge(bridge.outPort, edge.to)
+              remote.addEdge(bridge, remoteEdge, node2)
+              val localEdge = Edge(edge.from, bridge.inPort)
+              local.addEdge(node1, localEdge, bridge)
+          }
+        case (Remote, Local) =>
+          node1 match {
+            case bridge: BridgeModule[_, _, _] =>
+              local.addEdge(node1, edge, node2)
+            case _ =>
+              // create a bridge module in between
+              val bridge = new SinkBridgeModule[AnyRef, AnyRef]()
+              val remoteEdge = Edge(edge.from, bridge.inPort)
+              remote.addEdge(node1, remoteEdge, bridge)
+              val localEdge = Edge(bridge.outPort, edge.to)
+              local.addEdge(bridge, localEdge, node2)
+          }
+      }
+    }
+
+    List(new RemoteGraph(remote), new LocalGraph(local))
+  }
+
+  private def tag(graph: Graph[Module, Edge], strategy: Strategy): Map[Module, Location] = {
+    graph.vertices.map{vertex =>
+      vertex -> strategy.apply(vertex)
+    }.toMap
+  }
+
+  private def removeDummyModule(inputGraph: Graph[Module, Edge]): Graph[Module, Edge] = {
+    val graph = inputGraph.copy
+    val dummies = graph.vertices.filter {module =>
+      module match {
+        case dummy: DummyModule =>
+          true
+        case _ =>
+          false
+      }
+    }
+    dummies.foreach(module => graph.removeVertex(module))
+    graph
+  }
+}
+
+object GraphPartitioner {
+
+  type Strategy = PartialFunction[Module, Location]
+
+  val BaseStrategy: Strategy = {
+    case source: BridgeModule[_, _, _] =>
+      Remote
+    case task: GearpumpTaskModule =>
+      Remote
+    case groupBy: GroupByModule[_, _] =>
+      // TODO: groupBy is not supported by local materializer
+      Remote
+    case source: SourceModule[_, _] =>
+      Local
+    case sink: SinkModule[_, _] =>
+      Local
+    case remaining: Module =>
+      remaining.shape match {
+        case sourceShape: SourceShape[_] =>
+          Local
+        case sinkShape: SinkShape[_] =>
+          Local
+        case otherShapes: Shape =>
+          Remote
+      }
+  }
+
+  val AllRemoteStrategy: Strategy = BaseStrategy orElse {
+    case graphStageModule: GraphStageModule =>
+      graphStageModule.stage match {
+        case matValueSource: MaterializedValueSource[_] =>
+          Local
+        case singleSource: SingleSource[_] =>
+          Local
+        case _ =>
+          Remote
+      }
+    case _ =>
+      Remote
+  }
+
+  /**
+   * Will decide whether to render a module locally or remotely
+   * based on Attribute settings.
+   *
+   */
+  val TagAttributeStrategy: Strategy = BaseStrategy orElse {
+    case module =>
+      GearAttributes.location(module.attributes)
+  }
+
+  val AllLocalStrategy: Strategy = BaseStrategy orElse {
+    case graphStageModule: GraphStageModule =>
+      // TODO kasravi review
+      graphStageModule.stage match {
+        case matValueSource: MaterializedValueSource[_] =>
+          Local
+        case _ =>
+          Local
+      }
+    case _ =>
+      Local
+  }
+
+  def apply(strategy: Strategy): GraphPartitioner = {
+    new GraphPartitioner(strategy)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala
new file mode 100644
index 0000000..c03fce2
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.graph
+
+import akka.actor.ActorSystem
+import akka.stream.impl.Stages.DefaultAttributes
+import akka.stream.impl.StreamLayout.Module
+import akka.stream.impl.{PublisherSource, SubscriberSink}
+import akka.stream.{SinkShape, SourceShape}
+import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
+import org.apache.gearpump.akkastream.materializer.LocalMaterializerImpl
+import org.apache.gearpump.akkastream.module.{SinkBridgeModule, SourceBridgeModule}
+import org.apache.gearpump.util.Graph
+import org.reactivestreams.{Publisher, Subscriber}
+
+/**
+ *
+ * [[LocalGraph]] is a [[SubGraph]] of the application DSL Graph, which only
+ *  contain module that can be materialized in local JVM.
+ *
+ * @param graph Graph[Module, Edge]
+ */
+class LocalGraph(override val graph: Graph[Module, Edge]) extends SubGraph
+
+object LocalGraph {
+
+  /**
+   * materialize LocalGraph in local JVM
+   * @param system ActorSystem
+   */
+  class LocalGraphMaterializer(system: ActorSystem) extends SubGraphMaterializer {
+
+    // create a local materializer
+    val materializer = LocalMaterializerImpl()(system)
+
+    /**
+     *
+     * @param matValues Materialized Values for each module before materialization
+     * @return Materialized Values for each Module after the materialization.
+     */
+    override def materialize(graph: SubGraph,
+        matValues: scala.collection.mutable.Map[Module, Any]):
+        scala.collection.mutable.Map[Module, Any] = {
+      val newGraph: Graph[Module, Edge] = graph.graph.mapVertex {
+        case source: SourceBridgeModule[in, out] =>
+          val subscriber = matValues(source).asInstanceOf[Subscriber[in]]
+          val shape: SinkShape[in] = SinkShape(source.inPort)
+          new SubscriberSink(subscriber, DefaultAttributes.subscriberSink, shape)
+        case sink: SinkBridgeModule[in, out] =>
+          val publisher = matValues(sink).asInstanceOf[Publisher[out]]
+          val shape: SourceShape[out] = SourceShape(sink.outPort)
+          new PublisherSource(publisher, DefaultAttributes.publisherSource, shape)
+        case other =>
+          other
+      }
+      materializer.materialize(newGraph, matValues)
+    }
+
+    override def shutdown: Unit = {
+      materializer.shutdown()
+    }
+  }
+}
+