You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/01/20 12:43:43 UTC

[3/5] incubator-gearpump git commit: [GERAPUMP-22] Merge akka-streams branch into master

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test10.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test10.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test10.scala
new file mode 100644
index 0000000..826cdcf
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test10.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.NotUsed
+import akka.stream.{ClosedShape, ThrottleMode}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+ 
+/**
+ * Stream example showing Conflate, Throttle
+ */
+object Test10 extends AkkaApp with ArgumentsParser {
+
+  // scalastyle:off println
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    import akka.actor.ActorSystem
+    import akka.stream.scaladsl._
+
+    implicit val system = ActorSystem("Test10", akkaConfig)
+    implicit val materializer = GearpumpMaterializer()
+    implicit val ec = system.dispatcher
+
+    // Conflate[A] - (2 inputs, 1 output) concatenates two streams
+    // (first consumes one, then the second one)
+    def stream(x: String) = Stream.continually(x)
+
+    val sourceA = Source(stream("A"))
+    val sourceB = Source(stream("B"))
+
+    val throttler: Flow[String, String, NotUsed] =
+      Flow[String].throttle(1, 1.second, 1, ThrottleMode.Shaping)
+    val conflateFlow: Flow[String, String, NotUsed] =
+      Flow[String].conflate((x: String, y: String) => x: String)
+      ((acc: String, x: String) => s"$acc::$x")
+
+    val printFlow: Flow[(String, String), String, NotUsed] =
+      Flow[(String, String)].map {
+        x =>
+          println(s" lengths are : ${x._1.length} and ${x._2.length}  ;  ${x._1} zip ${x._2}")
+          x.toString
+      }
+
+    val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
+      import GraphDSL.Implicits._
+
+      val zipping = b.add(Zip[String, String]())
+
+      sourceA ~> throttler ~> zipping.in0
+      sourceB ~> conflateFlow ~> zipping.in1
+
+      zipping.out ~> printFlow ~> Sink.ignore
+
+      ClosedShape
+    })
+
+    graph.run()
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+  // scalastyle:on println
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test11.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test11.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test11.scala
new file mode 100644
index 0000000..087c57d
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test11.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.NotUsed
+import akka.stream.ClosedShape
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+ 
+/**
+ * Stream example showing Broadcast and Merge
+ */
+object Test11 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    import akka.actor.ActorSystem
+    import akka.stream.scaladsl._
+
+    implicit val system = ActorSystem("Test11", akkaConfig)
+    implicit val materializer = GearpumpMaterializer()
+    // implicit val materializer =
+    //   ActorMaterializer(ActorMaterializerSettings(system).withAutoFusing(false))
+    implicit val ec = system.dispatcher
+
+    val g = RunnableGraph.fromGraph(GraphDSL.create() {
+      implicit builder: GraphDSL.Builder[NotUsed] =>
+
+      import GraphDSL.Implicits._
+      val in = Source(1 to 10)
+      val output: (Any) => Unit = any => {
+        val s = s"**** $any"
+        println(s)
+      }
+      val out = Sink.foreach(output)
+
+      val broadcast = builder.add(Broadcast[Int](2))
+      val merge = builder.add(Merge[Int](2))
+
+      val f1, f2, f3, f4 = Flow[Int].map(_ + 10)
+
+      in ~> f1 ~> broadcast ~> f2 ~> merge ~> f3 ~> out
+      broadcast ~> f4 ~> merge
+
+      ClosedShape
+    })
+
+    g.run()
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+  // scalastyle:on println
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test12.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test12.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test12.scala
new file mode 100644
index 0000000..b4f4bce
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test12.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.stream.{ClosedShape, UniformFanInShape}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.{Await, Future}
+ 
+/**
+ * Partial source, sink example
+ */
+object Test12 extends AkkaApp with ArgumentsParser{
+  // scalastyle:off println
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    import akka.actor.ActorSystem
+    import akka.stream.scaladsl._
+
+    import scala.concurrent.duration._
+
+    implicit val system = ActorSystem("Test12", akkaConfig)
+    // implicit val materializer = ActorMaterializer(
+    //   ActorMaterializerSettings(system).withAutoFusing(false)
+    //   )
+    implicit val materializer = GearpumpMaterializer()
+    implicit val ec = system.dispatcher
+
+    val pickMaxOfThree = GraphDSL.create() { implicit b =>
+      import GraphDSL.Implicits._
+
+      val zip1 = b.add(ZipWith[Int, Int, Int](math.max))
+      val zip2 = b.add(ZipWith[Int, Int, Int](math.max))
+
+      zip1.out ~> zip2.in0
+
+      UniformFanInShape(zip2.out, zip1.in0, zip1.in1, zip2.in1)
+    }
+
+    val resultSink = Sink.head[Int]
+
+    val g = RunnableGraph.fromGraph(GraphDSL.create(resultSink) { implicit b =>
+      sink =>
+        import GraphDSL.Implicits._
+
+        // Importing the partial shape will return its shape (inlets & outlets)
+        val pm3 = b.add(pickMaxOfThree)
+
+        Source.single(1) ~> pm3.in(0)
+        Source.single(2) ~> pm3.in(1)
+        Source.single(3) ~> pm3.in(2)
+
+        pm3.out ~> sink.in
+
+        ClosedShape
+    })
+
+    val max: Future[Int] = g.run()
+    max.map(x => println(s"maximum of three numbers : $x"))
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+  // scalastyle:on println
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test13.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test13.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test13.scala
new file mode 100644
index 0000000..2e036cb
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test13.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import java.time._
+
+import akka.actor.ActorSystem
+import akka.stream.scaladsl.Source
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.scaladsl.Implicits._
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.collection.mutable
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import scala.util.Random
+
+/**
+ * GroupBy example
+ */
+
+/*
+// Original example
+val f = Source
+  .tick(0.seconds, 1.second, "")
+  .map { _ =>
+    val now = System.currentTimeMillis()
+    val delay = random.nextInt(8)
+    MyEvent(now - delay * 1000L)
+  }
+  .statefulMapConcat { () =>
+    val generator = new CommandGenerator()
+    ev => generator.forEvent(ev)
+  }
+  .groupBy(64, command => command.w)
+  .takeWhile(!_.isInstanceOf[CloseWindow])
+  .fold(AggregateEventData((0L, 0L), 0))({
+    case (agg, OpenWindow(window)) => agg.copy(w = window)
+    // always filtered out by takeWhile
+    case (agg, CloseWindow(_)) => agg
+    case (agg, AddToWindow(ev, _)) => agg.copy(eventCount = agg.eventCount + 1)
+  })
+  .async
+  .mergeSubstreams
+  .runForeach { agg =>
+    println(agg.toString)
+  }
+ */
+object Test13 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+
+    implicit val system = ActorSystem("Test13", akkaConfig)
+    implicit val materializer = GearpumpMaterializer()
+
+    val random = new Random()
+
+    val result = Source
+      .tick(0.seconds, 1.second, "tick data")
+      .map { _ =>
+        val now = System.currentTimeMillis()
+        val delay = random.nextInt(8)
+        MyEvent(now - delay * 1000L)
+      }
+      .statefulMapConcat { () =>
+        val generator = new CommandGenerator()
+        ev => generator.forEvent(ev)
+      }
+      .groupBy2(command => command.w)
+      .takeWhile(!_.isInstanceOf[CloseWindow])
+      .fold(AggregateEventData((0L, 0L), 0))({
+        case (agg, OpenWindow(window)) => agg.copy(w = window)
+        // always filtered out by takeWhile
+        case (agg, CloseWindow(_)) => agg
+        case (agg, AddToWindow(ev, _)) => agg.copy(eventCount = agg.eventCount + 1)
+      })
+      .runForeach(agg =>
+        println(agg.toString)
+      )
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+
+  case class MyEvent(timestamp: Long)
+
+  type Window = (Long, Long)
+
+  object Window {
+    val WindowLength = 10.seconds.toMillis
+    val WindowStep = 1.second.toMillis
+    val WindowsPerEvent = (WindowLength / WindowStep).toInt
+
+    def windowsFor(ts: Long): Set[Window] = {
+      val firstWindowStart = ts - ts % WindowStep - WindowLength + WindowStep
+      (for (i <- 0 until WindowsPerEvent) yield
+        (firstWindowStart + i * WindowStep,
+          firstWindowStart + i * WindowStep + WindowLength)
+        ).toSet
+    }
+  }
+
+  sealed trait WindowCommand {
+    def w: Window
+  }
+
+  case class OpenWindow(w: Window) extends WindowCommand
+
+  case class CloseWindow(w: Window) extends WindowCommand
+
+  case class AddToWindow(ev: MyEvent, w: Window) extends WindowCommand
+
+  class CommandGenerator {
+    private val MaxDelay = 5.seconds.toMillis
+    private var watermark = 0L
+    private val openWindows = mutable.Set[Window]()
+
+    def forEvent(ev: MyEvent): List[WindowCommand] = {
+      watermark = math.max(watermark, ev.timestamp - MaxDelay)
+      if (ev.timestamp < watermark) {
+        println(s"Dropping event with timestamp: ${tsToString(ev.timestamp)}")
+        Nil
+      } else {
+        val eventWindows = Window.windowsFor(ev.timestamp)
+
+        val closeCommands = openWindows.flatMap { ow =>
+          if (!eventWindows.contains(ow) && ow._2 < watermark) {
+            openWindows.remove(ow)
+            Some(CloseWindow(ow))
+          } else None
+        }
+
+        val openCommands = eventWindows.flatMap { w =>
+          if (!openWindows.contains(w)) {
+            openWindows.add(w)
+            Some(OpenWindow(w))
+          } else None
+        }
+
+        val addCommands = eventWindows.map(w => AddToWindow(ev, w))
+
+        openCommands.toList ++ closeCommands.toList ++ addCommands.toList
+      }
+    }
+  }
+
+  case class AggregateEventData(w: Window, eventCount: Int) {
+    override def toString: String =
+      s"Between ${tsToString(w._1)} and ${tsToString(w._2)}, there were $eventCount events."
+  }
+
+  def tsToString(ts: Long): String = OffsetDateTime
+    .ofInstant(Instant.ofEpochMilli(ts), ZoneId.systemDefault())
+    .toLocalTime
+    .toString
+  // scalastyle:on println
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test14.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test14.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test14.scala
new file mode 100644
index 0000000..c436130
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test14.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import java.io.File
+
+import akka.NotUsed
+import akka.actor.ActorSystem
+import akka.stream._
+import akka.stream.scaladsl._
+import akka.util.ByteString
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent._
+import scala.concurrent.duration._
+
+object Test14 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    implicit val system = ActorSystem("Test14", akkaConf)
+    implicit val materializer = GearpumpMaterializer()
+
+    def lineSink(filename: String): Sink[String, Future[IOResult]] = {
+      Flow[String]
+        .alsoTo(Sink.foreach(s => println(s"$filename: $s")))
+        .map(s => ByteString(s + "\n"))
+        .toMat(FileIO.toPath(new File(filename).toPath))(Keep.right)
+    }
+
+    val source: Source[Int, NotUsed] = Source(1 to 100)
+    val factorials: Source[BigInt, NotUsed] = source.scan(BigInt(1))((acc, next) => acc * next)
+    val sink1 = lineSink("factorial1.txt")
+    val sink2 = lineSink("factorial2.txt")
+    val slowSink2 = Flow[String].via(
+      Flow[String].throttle(1, 1.second, 1, ThrottleMode.shaping)
+    ).toMat(sink2)(Keep.right)
+    val bufferedSink2 = Flow[String].buffer(50, OverflowStrategy.backpressure).via(
+      Flow[String].throttle(1, 1.second, 1, ThrottleMode.shaping)
+    ).toMat(sink2)(Keep.right)
+
+    val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
+      import GraphDSL.Implicits._
+      val bcast = b.add(Broadcast[String](2))
+      factorials.map(_.toString) ~> bcast.in
+      bcast.out(0) ~> sink1
+      bcast.out(1) ~> bufferedSink2
+      ClosedShape
+    })
+
+    g.run()
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+  // scalastyle:on println
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test15.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test15.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test15.scala
new file mode 100644
index 0000000..f4e4dbd
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test15.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.ActorSystem
+import akka.stream._
+import akka.stream.scaladsl.{Balance, Broadcast, Flow, GraphDSL, Merge, RunnableGraph, Sink, Source}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+
+object Test15 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
+  )
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+    implicit val system = ActorSystem("Test15", akkaConf)
+    implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
+      case true =>
+        GearpumpMaterializer()
+      case false =>
+        ActorMaterializer(
+          ActorMaterializerSettings(system).withAutoFusing(false)
+        )
+    }
+    import akka.stream.scaladsl.GraphDSL.Implicits._
+    RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
+      val A = builder.add(Source.single(0)).out
+      val B = builder.add(Broadcast[Int](2))
+      val C = builder.add(Merge[Int](2).named("C"))
+      val D = builder.add(Flow[Int].map(_ + 1).named("D"))
+      val E = builder.add(Balance[Int](2).named("E"))
+      val F = builder.add(Merge[Int](2).named("F"))
+      val G = builder.add(Sink.foreach(println).named("G")).in
+
+      C <~ F
+      A ~> B ~> C ~> F
+      B ~> D ~> E ~> F
+      E ~> G
+
+      ClosedShape
+    }).run()
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+  // scalastyle:on println
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test16.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test16.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test16.scala
new file mode 100644
index 0000000..9691496
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test16.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.scaladsl.{GearSink, GearSource}
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.streaming.dsl.scalaapi.{CollectionDataSource, LoggerSink}
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+/**
+ * All remote
+ */
+object Test16 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    implicit val system = ActorSystem("Test16", akkaConf)
+    implicit val materializer = GearpumpMaterializer()
+
+    val sink = GearSink.to(new LoggerSink[String])
+    val sourceData = new CollectionDataSource(
+      List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky"))
+    val source = GearSource.from[String](sourceData)
+    source.filter(_.startsWith("red")).map("I want to order item: " + _).runWith(sink)
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+  // scalastyle:on println
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test2.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test2.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test2.scala
new file mode 100644
index 0000000..a6049cd
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test2.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.stream.scaladsl._
+import akka.stream.{ActorMaterializer, ClosedShape}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.scaladsl.{GearSink, GearSource}
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+/**
+ *
+ * This tests how different Materializers can be used together in an explicit way.
+ *
+ */
+object Test2 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+    implicit val system = ActorSystem("Test2", akkaConf)
+    val gearpumpMaterializer = GearpumpMaterializer()
+
+    val echo = system.actorOf(Props(new Echo()))
+    val source = GearSource.bridge[String, String]
+    val sink = GearSink.bridge[String, String]
+
+    val flow = Flow[String].filter(_.startsWith("red")).map("I want to order item: " + _)
+    val (entry, exit) = flow.runWith(source, sink)(gearpumpMaterializer)
+
+    val actorMaterializer = ActorMaterializer()
+
+    val externalSource = Source(
+      List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky")
+    )
+    val externalSink = Sink.actorRef(echo, "COMPLETE")
+
+    RunnableGraph.fromGraph(
+      GraphDSL.create() { implicit b =>
+        import GraphDSL.Implicits._
+        externalSource ~> Sink.fromSubscriber(entry)
+        Source.fromPublisher(exit) ~> externalSink
+        ClosedShape
+      }
+    ).run()(actorMaterializer)
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+
+  class Echo extends Actor {
+    def receive: Receive = {
+      case any: AnyRef =>
+        println("Confirm received: " + any)
+    }
+  }
+  // scalastyle:on println
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test3.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test3.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test3.scala
new file mode 100644
index 0000000..24faeb3
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test3.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.scaladsl.GearSource
+import akka.stream.scaladsl.Sink
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+/**
+ * read from remote and write to local
+ */
+object Test3 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
+  )
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+    implicit val system = ActorSystem("Test3", akkaConf)
+    implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
+      case true =>
+        GearpumpMaterializer()
+      case false =>
+        ActorMaterializer(
+          ActorMaterializerSettings(system).withAutoFusing(false)
+        )
+    }
+    val echo = system.actorOf(Props(new Echo()))
+    val sink = Sink.actorRef(echo, "COMPLETE")
+    val sourceData = new CollectionDataSource(
+      List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky"))
+    val source = GearSource.from[String](sourceData)
+    source.filter(_.startsWith("red")).map("I want to order item: " + _).runWith(sink)
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+
+  class Echo extends Actor {
+    def receive: Receive = {
+      case any: AnyRef =>
+        println("Confirm received: " + any)
+    }
+  }
+  // scalastyle:on println
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test4.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test4.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test4.scala
new file mode 100644
index 0000000..6a44a35
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test4.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.ActorSystem
+import akka.stream.scaladsl.Source
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.scaladsl.GearSink
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.streaming.dsl.scalaapi.LoggerSink
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+/**
+ * read from local and write to remote
+ */
+object Test4 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    implicit val system = ActorSystem("Test4", akkaConf)
+    implicit val materializer = GearpumpMaterializer()
+
+    Source(
+      List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky")
+    ).filter(_.startsWith("red")).
+      map("I want to order item: " + _).
+      runWith(GearSink.to(new LoggerSink[String]))
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+  // scalastyle:on println
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test5.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test5.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test5.scala
new file mode 100644
index 0000000..ad87a97
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test5.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.stream.ClosedShape
+import akka.stream.scaladsl._
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+/**
+ * test fanout
+ */
+object Test5 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    implicit val system = ActorSystem("Test5", akkaConf)
+    implicit val materializer = GearpumpMaterializer()
+
+    val echo = system.actorOf(Props(new Echo()))
+    val source = Source(List(("male", "24"), ("female", "23")))
+    val sink = Sink.actorRef(echo, "COMPLETE")
+
+    RunnableGraph.fromGraph(
+      GraphDSL.create() { implicit b =>
+        import GraphDSL.Implicits._
+        val unzip = b.add(Unzip[String, String]())
+        val sink1 = Sink.actorRef(echo, "COMPLETE")
+        val sink2 = Sink.actorRef(echo, "COMPLETE")
+        source ~> unzip.in
+        unzip.out0 ~> sink1
+        unzip.out1 ~> sink1
+        ClosedShape
+      }
+    ).run()
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+
+  class Echo extends Actor {
+    def receive: Receive = {
+      case any: AnyRef =>
+        println("Confirm received: " + any)
+    }
+  }
+  // scalastyle:on println
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test6.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test6.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test6.scala
new file mode 100644
index 0000000..a525471
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test6.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.stream.scaladsl.Sink
+import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.scaladsl.GearSource
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+
+/**
+ *  WordCount example
+ * Test GroupBy2 (groupBy which uses SubFlow is not implemented yet)
+ */
+
+import org.apache.gearpump.akkastream.scaladsl.Implicits._
+
+object Test6 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
+  )
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+    implicit val system = ActorSystem("Test6", akkaConf)
+    implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
+      case true =>
+        GearpumpMaterializer()
+      case false =>
+        ActorMaterializer(
+          ActorMaterializerSettings(system).withAutoFusing(false)
+        )
+    }
+    val echo = system.actorOf(Props(Echo()))
+    val sink = Sink.actorRef(echo, "COMPLETE")
+    val sourceData = new CollectionDataSource(
+      List(
+        "this is a good start",
+        "this is a good time",
+        "time to start",
+        "congratulations",
+        "green plant",
+        "blue sky")
+    )
+    val source = GearSource.from[String](sourceData)
+    source.mapConcat({line =>
+      line.split(" ").toList
+    }).groupBy2(x => x)
+      .map(word => (word, 1))
+      .reduce({(a, b) =>
+        (a._1, a._2 + b._2)
+      })
+      .log("word-count")
+      .runWith(sink)
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+
+  case class Echo() extends Actor {
+    def receive: Receive = {
+      case any: AnyRef =>
+        println("Confirm received: " + any)
+    }
+  }
+  // scalastyle:on println
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test7.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test7.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test7.scala
new file mode 100644
index 0000000..8c837af
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test7.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.ActorSystem
+import akka.stream.scaladsl.{Broadcast, Merge, Sink, Source}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+
+/**
+ * This is a simplified API you can use to combine sources and sinks
+ * with junctions like: Broadcast[T], Balance[T], Merge[In] and Concat[A]
+ * without the need for using the Graph DSL
+ */
+
+object Test7 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    implicit val system = ActorSystem("Test7", akkaConf)
+    implicit val materializer = GearpumpMaterializer()
+    implicit val ec = system.dispatcher
+ 
+    val sourceA = Source(List(1))
+    val sourceB = Source(List(2))
+    val mergedSource = Source.combine(sourceA, sourceB)(Merge(_))
+
+    val sinkA = Sink.foreach[Int](x => println(s"In SinkA : $x"))
+    val sinkB = Sink.foreach[Int](x => println(s"In SinkB : $x"))
+    val sink = Sink.combine(sinkA, sinkB)(Broadcast[Int](_))
+    mergedSource.runWith(sink)
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+  // scalastyle:on println
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala
new file mode 100644
index 0000000..ad2ac61
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.NotUsed
+import akka.actor.ActorSystem
+import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Materializer}
+import akka.stream.scaladsl._
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+ 
+/**
+ * Stream example to find sum of elements
+ */
+object Test8 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
+  )
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+    implicit val system = ActorSystem("Test8", akkaConf)
+    implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
+      case true =>
+        GearpumpMaterializer()
+      case false =>
+        ActorMaterializer(
+          ActorMaterializerSettings(system).withAutoFusing(false)
+        )
+    }
+    implicit val ec = system.dispatcher
+
+    // Source gives 1 to 100 elements
+    val source: Source[Int, NotUsed] = Source(Stream.from(1).take(100))
+    val sink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _)
+
+    val result: Future[Int] = source.runWith(sink)
+    result.map(sum => {
+      println(s"Sum of stream elements => $sum")
+    })
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+  // scalastyle:on println
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala
new file mode 100644
index 0000000..66414e0
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.NotUsed
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.stream.{ActorMaterializer, ActorMaterializerSettings, ClosedShape}
+import akka.stream.scaladsl._
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+ 
+/**
+ * Stream example showing Broadcast
+ */
+object Test9 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
+  )
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+    implicit val system = ActorSystem("Test9", akkaConf)
+    implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
+      case true =>
+        GearpumpMaterializer()
+      case false =>
+        ActorMaterializer(
+          ActorMaterializerSettings(system).withAutoFusing(false)
+        )
+    }
+    implicit val ec = system.dispatcher
+
+    val sinkActor = system.actorOf(Props(new SinkActor()))
+    val source = Source((1 to 5))
+    val sink = Sink.actorRef(sinkActor, "COMPLETE")
+    val flowA: Flow[Int, Int, NotUsed] = Flow[Int].map {
+      x => println(s"processing broadcasted element : $x in flowA"); x
+    }
+    val flowB: Flow[Int, Int, NotUsed] = Flow[Int].map {
+      x => println(s"processing broadcasted element : $x in flowB"); x
+    }
+
+    val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
+      import GraphDSL.Implicits._
+      val broadcast = b.add(Broadcast[Int](2))
+      val merge = b.add(Merge[Int](2))
+      source ~> broadcast
+      broadcast ~> flowA ~> merge
+      broadcast ~> flowB ~> merge
+      merge ~> sink
+      ClosedShape
+    })
+
+    graph.run()
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+
+  class SinkActor extends Actor {
+    def receive: Receive = {
+      case any: AnyRef =>
+        println("Confirm received: " + any)
+    }
+  }
+  // scalastyle:on println
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala
new file mode 100644
index 0000000..2a1e7ff
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import java.io.{File, FileInputStream}
+import java.util.zip.GZIPInputStream
+
+import akka.NotUsed
+import akka.actor.ActorSystem
+import akka.stream.scaladsl._
+import akka.stream.{ClosedShape, IOResult}
+import akka.util.ByteString
+import org.apache.gearpump.akkastream.graph.GraphPartitioner
+import org.apache.gearpump.akkastream.{GearAttributes, GearpumpMaterializer}
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.util.AkkaApp
+import org.json4s.JsonAST.JString
+import org.json4s.jackson.JsonMethods
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.util.{Failure, Success, Try}
+
+/**
+ * this example is ported from http://engineering.intenthq.com/2015/06/wikidata-akka-streams/
+ * which showcases running Akka Streams DSL across JVMs on Gearpump
+ *
+ * Usage: output/target/pack/bin/gear app
+ *  -jar experiments/akkastream/target/scala_2.11/akkastream-${VERSION}-SNAPSHOT-assembly.jar
+ *  -input wikidata-${DATE}-all.json.gz -languages en,de
+ *
+ * (Note: Wikipedia data can be downloaded from https://dumps.wikimedia.org/wikidatawiki/entities/)
+ *
+ */
+object WikipediaApp extends ArgumentsParser with AkkaApp {
+
+  case class WikidataElement(id: String, sites: Map[String, String])
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "input" -> CLIOption[String]("<Wikidata JSON dump>", required = true),
+    "languages" -> CLIOption[String]("<languages to take into account>", required = true)
+  )
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val parsed = parse(args)
+    val input = new File(parsed.getString("input"))
+    val langs = parsed.getString("languages").split(",")
+
+    implicit val system = ActorSystem("WikipediaApp", akkaConf)
+    implicit val materializer =
+      GearpumpMaterializer(GraphPartitioner.TagAttributeStrategy)
+    import system.dispatcher
+
+    val elements = source(input).via(parseJson(langs))
+
+    val g = RunnableGraph.fromGraph(
+      GraphDSL.create(count) { implicit b =>
+        sinkCount => {
+          import GraphDSL.Implicits._
+          val broadcast = b.add(Broadcast[WikidataElement](2))
+          elements ~> broadcast ~> logEveryNSink(1000)
+          broadcast ~> checkSameTitles(langs.toSet) ~> sinkCount
+          ClosedShape
+        }
+      }
+    )
+
+    g.run().onComplete {
+      case Success((t, f)) => printResults(t, f)
+      // scalastyle:off println
+      case Failure(tr) => println("Something went wrong")
+      // scalastyle:on println
+    }
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+
+  def source(file: File): Source[String, Future[IOResult]] = {
+    val compressed = new GZIPInputStream(new FileInputStream(file), 65536)
+    StreamConverters.fromInputStream(() => compressed)
+      .via(Framing.delimiter(ByteString("\n"), Int.MaxValue))
+      .map(x => x.decodeString("utf-8"))
+  }
+
+  def parseJson(langs: Seq[String])(implicit ec: ExecutionContext):
+  Flow[String, WikidataElement, NotUsed] =
+    Flow[String].mapAsyncUnordered(8)(line => Future(parseItem(langs, line))).collect({
+      case Some(v) => v
+    })
+
+  def parseItem(langs: Seq[String], line: String): Option[WikidataElement] = {
+    Try(JsonMethods.parse(line)).toOption.flatMap { json =>
+      json \ "id" match {
+        case JString(itemId) =>
+
+          val sites: Seq[(String, String)] = for {
+            lang <- langs
+            JString(title) <- json \ "sitelinks" \ s"${lang}wiki" \ "title"
+          } yield lang -> title
+
+          if(sites.isEmpty) None
+          else Some(WikidataElement(id = itemId, sites = sites.toMap))
+
+        case _ => None
+      }
+    }
+  }
+
+  def logEveryNSink[T](n: Int): Sink[T, Future[Int]] = Sink.fold(0) { (x, y: T) =>
+    if (x % n == 0) {
+      // scalastyle:off println
+      println(s"Processing element $x: $y")
+      // scalastyle:on println
+    }
+    x + 1
+  }
+
+  def checkSameTitles(langs: Set[String]):
+    Flow[WikidataElement, Boolean, NotUsed] = Flow[WikidataElement]
+    .filter(_.sites.keySet == langs)
+    .map { x =>
+      val titles = x.sites.values
+      titles.forall( _ == titles.head)
+    }.withAttributes(GearAttributes.remote)
+
+  def count: Sink[Boolean, Future[(Int, Int)]] = Sink.fold((0, 0)) {
+    case ((t, f), true) => (t + 1, f)
+    case ((t, f), false) => (t, f + 1)
+  }
+
+  def printResults(t: Int, f: Int): Unit = {
+    val message = s"""
+      | Number of items with the same title: $t
+      | Number of items with the different title: $f
+      | Ratios: ${t.toDouble / (t + f)} / ${f.toDouble / (t + f)}
+      """.stripMargin
+    // scalastyle:off println
+    println(message)
+    // scalastyle:on println
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala
new file mode 100644
index 0000000..f7919c0
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.graph
+
+import akka.stream.{Shape, SinkShape, SourceShape}
+import org.apache.gearpump.akkastream.GearAttributes
+import org.apache.gearpump.akkastream.GearAttributes.{Local, Location, Remote}
+import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
+import org.apache.gearpump.akkastream.graph.GraphPartitioner.Strategy
+import org.apache.gearpump.akkastream.module._
+import akka.stream.impl.StreamLayout.Module
+import akka.stream.impl.fusing.GraphStageModule
+import akka.stream.impl.fusing.GraphStages.{MaterializedValueSource, SimpleLinearGraphStage, SingleSource}
+import akka.stream.impl.{SinkModule, SourceModule}
+import org.apache.gearpump.util.Graph
+
+/**
+ *
+ * GraphPartitioner is used to decide which part will be rendered locally
+ * and which part should be rendered remotely.
+ *
+ * We will cut the graph based on the [[Strategy]] provided.
+ *
+ * For example, for the following graph, we can cut the graph to
+ * two parts, each part will be a Sub Graph. The top SubGraph
+ * can be materialized remotely. The bottom part can be materialized
+ * locally.
+ *
+ *        AtomicModule2 -> AtomicModule4
+ *           /|                 \
+ *          /                    \
+ * -----------cut line -------------cut line ----------
+ *       /                        \
+ *     /                           \|
+ * AtomicModule1           AtomicModule5
+ *     \                   /|
+ *      \                 /
+ *       \|              /
+ *         AtomicModule3
+ *
+ * @see [[akka.stream.impl.MaterializerSession]] for more information of how Graph is organized.
+ *
+ */
+class GraphPartitioner(strategy: Strategy) {
+  def partition(moduleGraph: Graph[Module, Edge]): List[SubGraph] = {
+    val graph = removeDummyModule(moduleGraph)
+    val tags = tag(graph, strategy)
+    doPartition(graph, tags)
+  }
+
+  private def doPartition(graph: Graph[Module, Edge], tags: Map[Module, Location]):
+  List[SubGraph] = {
+    val local = Graph.empty[Module, Edge]
+    val remote = Graph.empty[Module, Edge]
+
+    graph.vertices.foreach{ module =>
+      if (tags(module) == Local) {
+        local.addVertex(module)
+      } else {
+        remote.addVertex(module)
+      }
+    }
+
+    graph.edges.foreach{ nodeEdgeNode =>
+      val (node1, edge, node2) = nodeEdgeNode
+      (tags(node1), tags(node2)) match {
+        case (Local, Local) =>
+          local.addEdge(nodeEdgeNode)
+        case (Remote, Remote) =>
+          remote.addEdge(nodeEdgeNode)
+        case (Local, Remote) =>
+          node2 match {
+            case bridge: BridgeModule[_, _, _] =>
+              local.addEdge(node1, edge, node2)
+            case _ =>
+              // create a bridge module in between
+              val bridge = new SourceBridgeModule[AnyRef, AnyRef]()
+              val remoteEdge = Edge(bridge.outPort, edge.to)
+              remote.addEdge(bridge, remoteEdge, node2)
+              val localEdge = Edge(edge.from, bridge.inPort)
+              local.addEdge(node1, localEdge, bridge)
+          }
+        case (Remote, Local) =>
+          node1 match {
+            case bridge: BridgeModule[_, _, _] =>
+              local.addEdge(node1, edge, node2)
+            case _ =>
+              // create a bridge module in between
+              val bridge = new SinkBridgeModule[AnyRef, AnyRef]()
+              val remoteEdge = Edge(edge.from, bridge.inPort)
+              remote.addEdge(node1, remoteEdge, bridge)
+              val localEdge = Edge(bridge.outPort, edge.to)
+              local.addEdge(bridge, localEdge, node2)
+          }
+      }
+    }
+
+    List(new RemoteGraph(remote), new LocalGraph(local))
+  }
+
+  private def tag(graph: Graph[Module, Edge], strategy: Strategy): Map[Module, Location] = {
+    graph.vertices.map{vertex =>
+      vertex -> strategy.apply(vertex)
+    }.toMap
+  }
+
+  private def removeDummyModule(inputGraph: Graph[Module, Edge]): Graph[Module, Edge] = {
+    val graph = inputGraph.copy
+    val dummies = graph.vertices.filter {module =>
+      module match {
+        case dummy: DummyModule =>
+          true
+        case _ =>
+          false
+      }
+    }
+    dummies.foreach(module => graph.removeVertex(module))
+    graph
+  }
+}
+
+object GraphPartitioner {
+
+  type Strategy = PartialFunction[Module, Location]
+
+  val BaseStrategy: Strategy = {
+    case source: BridgeModule[_, _, _] =>
+      Remote
+    case task: GearpumpTaskModule =>
+      Remote
+    case groupBy: GroupByModule[_, _] =>
+      // TODO: groupBy is not supported by local materializer
+      Remote
+    case source: SourceModule[_, _] =>
+      Local
+    case sink: SinkModule[_, _] =>
+      Local
+    case remaining: Module =>
+      remaining.shape match {
+        case sourceShape: SourceShape[_] =>
+          Local
+        case sinkShape: SinkShape[_] =>
+          Local
+        case otherShapes: Shape =>
+          Remote
+      }
+  }
+
+  val AllRemoteStrategy: Strategy = BaseStrategy orElse {
+    case graphStageModule: GraphStageModule =>
+      graphStageModule.stage match {
+        case matValueSource: MaterializedValueSource[_] =>
+          Local
+        case singleSource: SingleSource[_] =>
+          Local
+        case _ =>
+          Remote
+      }
+    case _ =>
+      Remote
+  }
+
+  /**
+   * Will decide whether to render a module locally or remotely
+   * based on Attribute settings.
+   *
+   */
+  val TagAttributeStrategy: Strategy = BaseStrategy orElse {
+    case module =>
+      GearAttributes.location(module.attributes)
+  }
+
+  val AllLocalStrategy: Strategy = BaseStrategy orElse {
+    case graphStageModule: GraphStageModule =>
+      // TODO kasravi review
+      graphStageModule.stage match {
+        case matValueSource: MaterializedValueSource[_] =>
+          Local
+        case _ =>
+          Local
+      }
+    case _ =>
+      Local
+  }
+
+  def apply(strategy: Strategy): GraphPartitioner = {
+    new GraphPartitioner(strategy)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala
new file mode 100644
index 0000000..fe86951
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.graph
+
+import akka.actor.ActorSystem
+import akka.stream.impl.Stages.DefaultAttributes
+import akka.stream.impl.StreamLayout.Module
+import akka.stream.impl.{PublisherSource, SubscriberSink}
+import akka.stream.{SinkShape, SourceShape}
+import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
+import org.apache.gearpump.akkastream.materializer.LocalMaterializerImpl
+import org.apache.gearpump.akkastream.module.{SinkBridgeModule, SourceBridgeModule}
+import org.apache.gearpump.util.Graph
+import org.reactivestreams.{Publisher, Subscriber}
+
+/**
+ *
+ * [[LocalGraph]] is a [[SubGraph]] of the application DSL Graph, which only
+ *  contain module that can be materialized in local JVM.
+ *
+ * @param graph Graph[Module, Edge]
+ */
+class LocalGraph(override val graph: Graph[Module, Edge]) extends SubGraph
+
+object LocalGraph {
+
+  /**
+   * materialize LocalGraph in local JVM
+   * @param system ActorSystem
+   */
+  class LocalGraphMaterializer(system: ActorSystem) extends SubGraphMaterializer {
+
+    // create a local materializer
+    val materializer = LocalMaterializerImpl()(system)
+
+    /**
+     *
+     * @param matValues Materialized Values for each module before materialization
+     * @return Materialized Values for each Module after the materialization.
+     */
+    override def materialize(graph: SubGraph,
+        matValues: scala.collection.mutable.Map[Module, Any]):
+        scala.collection.mutable.Map[Module, Any] = {
+      val newGraph: Graph[Module, Edge] = graph.graph.mapVertex {
+        case source: SourceBridgeModule[in, out] =>
+          val subscriber = matValues(source).asInstanceOf[Subscriber[in]]
+          val shape: SinkShape[in] = SinkShape(source.inPort)
+          new SubscriberSink(subscriber, DefaultAttributes.subscriberSink, shape)
+        case sink: SinkBridgeModule[in, out] =>
+          val publisher = matValues(sink).asInstanceOf[Publisher[out]]
+          val shape: SourceShape[out] = SourceShape(sink.outPort)
+          new PublisherSource(publisher, DefaultAttributes.publisherSource, shape)
+        case other =>
+          other
+      }
+      materializer.materialize(newGraph, matValues)
+    }
+
+    override def shutdown: Unit = {
+      materializer.shutdown()
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
new file mode 100644
index 0000000..99ebe17
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.graph
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
+import org.apache.gearpump.akkastream.materializer.RemoteMaterializerImpl
+import org.apache.gearpump.akkastream.module.{SinkBridgeModule, SourceBridgeModule}
+import org.apache.gearpump.akkastream.task.SinkBridgeTask.SinkBridgeTaskClient
+import org.apache.gearpump.akkastream.task.SourceBridgeTask.SourceBridgeTaskClient
+import akka.stream.impl.StreamLayout.Module
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.embedded.EmbeddedCluster
+import org.apache.gearpump.streaming.ProcessorId
+import org.apache.gearpump.util.Graph
+
+/**
+ *
+ * [[RemoteGraph]] is a [[SubGraph]] of the application DSL Graph, which only
+ *  contain modules that can be materialized in remote Gearpump cluster.
+ *
+ * @param graph Graph
+ */
+class RemoteGraph(override val graph: Graph[Module, Edge]) extends SubGraph
+
+object RemoteGraph {
+
+  /**
+   * * materialize LocalGraph in remote gearpump cluster
+   * @param useInProcessCluster Boolean
+   * @param system ActorSystem
+   */
+  class RemoteGraphMaterializer(useInProcessCluster: Boolean, system: ActorSystem)
+    extends SubGraphMaterializer {
+    private val local = if (useInProcessCluster) {
+      val cluster = EmbeddedCluster()
+      cluster.start()
+      Some(cluster)
+    } else {
+      None
+    }
+
+    private val context: ClientContext = local match {
+      case Some(l) => l.newClientContext
+      case None => ClientContext(system)
+    }
+
+    override def materialize(subGraph: SubGraph,
+        inputMatValues: scala.collection.mutable.Map[Module, Any]):
+        scala.collection.mutable.Map[Module, Any] = {
+      val graph = subGraph.graph
+      
+      if (graph.isEmpty) {
+        inputMatValues
+      } else {
+        doMaterialize(graph: Graph[Module, Edge], inputMatValues)
+      }
+    }
+
+    private def doMaterialize(graph: Graph[Module, Edge],
+        inputMatValues: scala.collection.mutable.Map[Module, Any]):
+        scala.collection.mutable.Map[Module, Any] = {
+      val materializer = new RemoteMaterializerImpl(graph, system)
+      val (app, matValues) = materializer.materialize
+
+      val appId = context.submit(app).appId
+      // scalastyle:off println
+      println("sleep 5 second until the application is ready on cluster")
+      // scalastyle:on println
+      Thread.sleep(5000)
+
+      def resolve(matValues: Map[Module, ProcessorId]): Map[Module, Any] = {
+        matValues.toList.flatMap { kv =>
+          val (module, processorId) = kv
+          module match {
+            case source: SourceBridgeModule[_, _] =>
+              val bridge = new SourceBridgeTaskClient[AnyRef](system.dispatcher,
+                context, appId, processorId)
+              Some((module, bridge))
+            case sink: SinkBridgeModule[_, _] =>
+              val bridge = new SinkBridgeTaskClient(system, context, appId, processorId)
+              Some((module, bridge))
+            case other =>
+              None
+          }
+        }.toMap
+      }
+
+      inputMatValues ++ resolve(matValues)
+    }
+
+    override def shutdown: Unit = {
+      context.close()
+      local.foreach(_.stop())
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala
new file mode 100644
index 0000000..a74143e
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.graph
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
+import akka.stream.impl.StreamLayout.Module
+import org.apache.gearpump.util.Graph
+
+/**
+ * [[SubGraph]] is a partial DAG
+ *
+ * The idea is that by dividing [[Graph]] to several
+ * [[SubGraph]], we can materialize each [[SubGraph]] with different
+ * materializer.
+ */
+
+trait SubGraph {
+
+  /**
+   * the [[Graph]] representation of this SubGraph
+   * @return
+   */
+  def graph: Graph[Module, Edge]
+}
+
+
+/**
+ * Materializer for Sub-Graph type
+ */
+trait SubGraphMaterializer {
+  /**
+   *
+   * @param matValues Materialized Values for each module before materialization
+   * @return Materialized Values for each Module after the materialization.
+   */
+
+  def materialize(graph: SubGraph,
+      matValues: scala.collection.mutable.Map[Module, Any]):
+    scala.collection.mutable.Map[Module, Any]
+
+  def shutdown: Unit
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala
new file mode 100644
index 0000000..477f4d3
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.materializer
+
+import java.util.concurrent.atomic.AtomicBoolean
+import java.{util => ju}
+
+import org.apache.gearpump.util.{Graph => GGraph}
+import akka.actor.{ActorRef, ActorSystem, Cancellable, Deploy, PoisonPill}
+import akka.dispatch.Dispatchers
+import akka.event.{Logging, LoggingAdapter}
+import akka.stream.impl.StreamLayout._
+import akka.stream.impl._
+import akka.stream.impl.fusing.GraphInterpreter.GraphAssembly
+import akka.stream.impl.fusing.{ActorGraphInterpreter, Fold, GraphInterpreterShell, GraphModule, GraphStageModule}
+import akka.stream.impl.fusing.GraphStages.MaterializedValueSource
+import akka.stream.scaladsl.ModuleExtractor
+import akka.stream.{ClosedShape, Graph => AkkaGraph, _}
+import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
+import org.apache.gearpump.akkastream.module.ReduceModule
+import org.apache.gearpump.akkastream.util.MaterializedValueOps
+import org.reactivestreams.{Publisher, Subscriber}
+
+import scala.concurrent.ExecutionContextExecutor
+import scala.concurrent.duration.FiniteDuration
+
+/**
+ * This materializer is functional equivalent to [[akka.stream.impl.ActorMaterializerImpl]]
+ *
+ * @param system System
+ * @param settings ActorMaterializerSettings
+ * @param dispatchers Dispatchers
+ * @param supervisor ActorRef
+ * @param haveShutDown AtomicBoolean
+ * @param flowNames SeqActorName
+ */
+case class LocalMaterializerImpl (
+    override val system: ActorSystem,
+    override val settings: ActorMaterializerSettings,
+    dispatchers: Dispatchers,
+    override val supervisor: ActorRef,
+    haveShutDown: AtomicBoolean,
+    flowNames: SeqActorName)
+  extends ExtendedActorMaterializer {
+
+  override def logger: LoggingAdapter = Logging.getLogger(system, this)
+
+  override def schedulePeriodically(initialDelay: FiniteDuration, interval: FiniteDuration,
+      task: Runnable): Cancellable =
+    system.scheduler.schedule(initialDelay, interval, task)(executionContext)
+
+  override def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable =
+    system.scheduler.scheduleOnce(delay, task)(executionContext)
+
+  override def effectiveSettings(opAttr: Attributes): ActorMaterializerSettings = {
+    import ActorAttributes._
+    import Attributes._
+    opAttr.attributeList.foldLeft(settings) { (s, attr) =>
+      attr match {
+        case InputBuffer(initial, max) => s.withInputBuffer(initial, max)
+        case Dispatcher(dispatcher) => s.withDispatcher(dispatcher)
+        case SupervisionStrategy(decider) => s.withSupervisionStrategy(decider)
+        case l: LogLevels => s
+        case Name(_) => s
+        case other => s
+      }
+    }
+  }
+
+  override def shutdown(): Unit =
+    if (haveShutDown.compareAndSet(false, true)) supervisor ! PoisonPill
+
+  override def isShutdown: Boolean = haveShutDown.get()
+
+  override lazy val executionContext: ExecutionContextExecutor =
+    dispatchers.lookup(settings.dispatcher match {
+      case Deploy.NoDispatcherGiven => Dispatchers.DefaultDispatcherId
+      case other => other
+  })
+
+
+  case class LocalMaterializerSession(module: Module, iAttributes: Attributes,
+      subflowFuser: GraphInterpreterShell => ActorRef = null)
+    extends MaterializerSession(module, iAttributes) {
+
+    override def materializeAtomic(atomic: AtomicModule,
+        effectiveAttributes: Attributes, matVal: ju.Map[Module, Any]): Unit = {
+
+      def newMaterializationContext() =
+        new MaterializationContext(LocalMaterializerImpl.this, effectiveAttributes,
+          stageName(effectiveAttributes))
+      atomic match {
+        case sink: SinkModule[_, _] =>
+          val (sub, mat) = sink.create(newMaterializationContext())
+          assignPort(sink.shape.in, sub.asInstanceOf[Subscriber[Any]])
+          matVal.put(atomic, mat)
+        case source: SourceModule[_, _] =>
+          val (pub, mat) = source.create(newMaterializationContext())
+          assignPort(source.shape.out, pub.asInstanceOf[Publisher[Any]])
+          matVal.put(atomic, mat)
+        case stage: ProcessorModule[_, _, _] =>
+          val (processor, mat) = stage.createProcessor()
+          assignPort(stage.inPort, processor)
+          assignPort(stage.outPort, processor.asInstanceOf[Publisher[Any]])
+          matVal.put(atomic, mat)
+        // FIXME
+        //        case tls: TlsModule =>
+        // TODO solve this so TlsModule doesn't need special treatment here
+        //          val es = effectiveSettings(effectiveAttributes)
+        //          val props =
+        //            TLSActor.props(es, tls.sslContext, tls.sslConfig,
+        //              tls.firstSession, tls.role, tls.closing, tls.hostInfo)
+        //          val impl = actorOf(props, stageName(effectiveAttributes), es.dispatcher)
+        //          def factory(id: Int) = new ActorPublisher[Any](impl) {
+        //            override val wakeUpMsg = FanOut.SubstreamSubscribePending(id)
+        //          }
+        //          val publishers = Vector.tabulate(2)(factory)
+        //          impl ! FanOut.ExposedPublishers(publishers)
+        //
+        //          assignPort(tls.plainOut, publishers(TLSActor.UserOut))
+        //          assignPort(tls.cipherOut, publishers(TLSActor.TransportOut))
+        //
+        //          assignPort(tls.plainIn, FanIn.SubInput[Any](impl, TLSActor.UserIn))
+        //          assignPort(tls.cipherIn, FanIn.SubInput[Any](impl, TLSActor.TransportIn))
+        //
+        //          matVal.put(atomic, NotUsed)
+        case graph: GraphModule =>
+          matGraph(graph, effectiveAttributes, matVal)
+        case stage: GraphStageModule =>
+          val graph =
+            GraphModule(GraphAssembly(stage.shape.inlets, stage.shape.outlets, stage.stage),
+              stage.shape, stage.attributes, Array(stage))
+          matGraph(graph, effectiveAttributes, matVal)
+      }
+    }
+
+    private def matGraph(graph: GraphModule, effectiveAttributes: Attributes,
+        matVal: ju.Map[Module, Any]): Unit = {
+      val calculatedSettings = effectiveSettings(effectiveAttributes)
+      val (handlers, logics) =
+        graph.assembly.materialize(effectiveAttributes, graph.matValIDs, matVal, registerSrc)
+
+      val shell = new GraphInterpreterShell(graph.assembly, handlers,
+        logics, graph.shape, calculatedSettings, LocalMaterializerImpl.this)
+
+      val impl =
+        if (subflowFuser != null && !effectiveAttributes.contains(Attributes.AsyncBoundary)) {
+          subflowFuser(shell)
+        } else {
+          val props = ActorGraphInterpreter.props(shell)
+          actorOf(props, stageName(effectiveAttributes), calculatedSettings.dispatcher)
+        }
+
+      for ((inlet, i) <- graph.shape.inlets.iterator.zipWithIndex) {
+        val subscriber = new ActorGraphInterpreter.BoundarySubscriber(impl, shell, i)
+        assignPort(inlet, subscriber)
+      }
+      for ((outlet, i) <- graph.shape.outlets.iterator.zipWithIndex) {
+        val publisher = new ActorGraphInterpreter.BoundaryPublisher(impl, shell, i)
+        impl ! ActorGraphInterpreter.ExposedPublisher(shell, i, publisher)
+        assignPort(outlet, publisher)
+      }
+    }
+  }
+
+  override def materialize[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat]): Mat = {
+
+    LocalMaterializerSession(ModuleExtractor.unapply(runnableGraph).get,
+      null, null).materialize().asInstanceOf[Mat]
+
+  }
+
+  override def materialize[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat],
+      initialAttributes: Attributes): Mat = {
+    materialize(runnableGraph)
+  }
+
+  override def materialize[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat],
+      subflowFuser: GraphInterpreterShell => ActorRef): Mat = {
+
+    LocalMaterializerSession(ModuleExtractor.unapply(runnableGraph).get,
+      null, null).materialize().asInstanceOf[Mat]
+
+  }
+
+  override def materialize[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat],
+      subflowFuser: (GraphInterpreterShell) => ActorRef, initialAttributes: Attributes): Mat = {
+    materialize(runnableGraph)
+  }
+
+  override def makeLogger(logSource: Class[_]): LoggingAdapter = {
+    logger
+  }
+
+  def buildToplevelModule(graph: GGraph[Module, Edge]): Module = {
+    var moduleInProgress: Module = EmptyModule
+    graph.vertices.foreach(module => {
+      moduleInProgress = moduleInProgress.compose(module)
+    })
+    graph.edges.foreach(value => {
+      val (node1, edge, node2) = value
+      moduleInProgress = moduleInProgress.wire(edge.from, edge.to)
+    })
+
+    moduleInProgress
+  }
+
+  def materialize(graph: GGraph[Module, Edge],
+      inputMatValues: scala.collection.mutable.Map[Module, Any]):
+      scala.collection.mutable.Map[Module, Any] = {
+    val topLevelModule = buildToplevelModule(graph)
+    val session = LocalMaterializerSession(topLevelModule, null, null)
+    import scala.collection.JavaConverters._
+    val matV = inputMatValues.asJava
+    val materializedGraph = graph.mapVertex { module =>
+      session.materializeAtomic(module.asInstanceOf[AtomicModule], module.attributes, matV)
+      matV.get(module)
+    }
+    materializedGraph.edges.foreach { nodeEdgeNode =>
+      val (node1, edge, node2) = nodeEdgeNode
+      val from = edge.from
+      val to = edge.to
+      node1 match {
+        case module1: Module =>
+          node2 match {
+            case module2: Module =>
+              val publisher = module1.downstreams(from).asInstanceOf[Publisher[Any]]
+              val subscriber = module2.upstreams(to).asInstanceOf[Subscriber[Any]]
+              publisher.subscribe(subscriber)
+            case _ =>
+          }
+        case _ =>
+      }
+    }
+    val matValSources = graph.vertices.flatMap(module => {
+      val rt: Option[MaterializedValueSource[_]] = module match {
+        case graphStage: GraphStageModule =>
+          graphStage.stage match {
+            case materializedValueSource: MaterializedValueSource[_] =>
+              Some(materializedValueSource)
+            case _ =>
+              None
+          }
+        case _ =>
+          None
+      }
+      rt
+    })
+    publishToMaterializedValueSource(matValSources, inputMatValues)
+    inputMatValues
+  }
+
+  private def publishToMaterializedValueSource(modules: List[MaterializedValueSource[_]],
+      matValues: scala.collection.mutable.Map[Module, Any]): Unit = {
+    modules.foreach { source =>
+      Option(source.computation).map { attr =>
+        val valueToPublish = MaterializedValueOps(attr).resolve(matValues)
+        source.setValue(valueToPublish)
+      }
+    }
+  }
+
+  private[this] def createFlowName(): String = flowNames.next()
+
+  val flowName = createFlowName()
+  var nextId = 0
+
+  def stageName(attr: Attributes): String = {
+    val name = s"$flowName-$nextId-${attr.nameOrDefault()}"
+    nextId += 1
+    name
+  }
+
+  override def withNamePrefix(name: String): LocalMaterializerImpl =
+    this.copy(flowNames = flowNames.copy(name))
+
+}
+
+object LocalMaterializerImpl {
+  case class MaterializedModule(module: Module, matValue: Any,
+      inputs: Map[InPort, Subscriber[_]] = Map.empty[InPort, Subscriber[_]],
+      outputs: Map[OutPort, Publisher[_]] = Map.empty[OutPort, Publisher[_]])
+
+  def apply(materializerSettings: Option[ActorMaterializerSettings] = None,
+      namePrefix: Option[String] = None)(implicit system: ActorSystem):
+  LocalMaterializerImpl = {
+
+    val settings = materializerSettings getOrElse ActorMaterializerSettings(system)
+    apply(settings, namePrefix.getOrElse("flow"))(system)
+  }
+
+  def apply(materializerSettings: ActorMaterializerSettings,
+      namePrefix: String)(implicit system: ActorSystem): LocalMaterializerImpl = {
+    val haveShutDown = new AtomicBoolean(false)
+
+    new LocalMaterializerImpl(
+      system,
+      materializerSettings,
+      system.dispatchers,
+      system.actorOf(StreamSupervisor.props(materializerSettings,
+        haveShutDown).withDispatcher(materializerSettings.dispatcher)),
+      haveShutDown,
+      FlowNames(system).name.copy(namePrefix))
+  }
+
+  def toFoldModule(reduce: ReduceModule[Any]): Fold[Any, Any] = {
+    val f = reduce.f
+    val aggregator = {(zero: Any, input: Any) =>
+      if (zero == null) {
+        input
+      } else {
+        f(zero, input)
+      }
+    }
+    Fold(null, aggregator)
+  }
+}