You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/10/12 06:31:03 UTC

incubator-gearpump git commit: update akkastream against latests Graph DSL

Repository: incubator-gearpump
Updated Branches:
  refs/heads/akka-streams bc3940352 -> f1bec6709


update akkastream against latests Graph DSL

Author: manuzhang <ow...@gmail.com>

Closes #97 from manuzhang/akka-streams-new.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/f1bec670
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/f1bec670
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/f1bec670

Branch: refs/heads/akka-streams
Commit: f1bec67098b80f9bffe719f885f8d089344a5579
Parents: bc39403
Author: manuzhang <ow...@gmail.com>
Authored: Wed Oct 12 14:30:49 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Wed Oct 12 14:30:49 2016 +0800

----------------------------------------------------------------------
 .../src/main/resources/geardefault.conf         |  2 +-
 .../akkastream/GearpumpMaterializer.scala       | 64 ++++++++++----------
 .../gearpump/akkastream/example/Test.scala      |  3 +-
 .../akkastream/example/WikipediaApp.scala       | 12 ++--
 .../materializer/RemoteMaterializerImpl.scala   | 40 ++++++------
 .../gearpump/akkastream/scaladsl/Api.scala      |  2 +-
 .../gearpump/akkastream/task/Unzip2Task.scala   |  2 +-
 project/Build.scala                             | 18 +++---
 project/Pack.scala                              | 16 ++---
 .../gearpump/streaming/StreamApplication.scala  |  2 +-
 .../apache/gearpump/streaming/dsl/plan/OP.scala | 19 ++++--
 11 files changed, 92 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f1bec670/experiments/akkastream/src/main/resources/geardefault.conf
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/resources/geardefault.conf b/experiments/akkastream/src/main/resources/geardefault.conf
index e9da531..56524d4 100644
--- a/experiments/akkastream/src/main/resources/geardefault.conf
+++ b/experiments/akkastream/src/main/resources/geardefault.conf
@@ -4,5 +4,5 @@ gearpump.serializers {
   "scala.collection.immutable.Map$Map2" = ""
 }
 akka {
-  version = "2.4.10"
+  version = "2.4.11"
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f1bec670/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala
index 75dc95a..9ff701c 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala
@@ -29,14 +29,13 @@ import akka.stream.impl.Stages.SymbolicGraphStage
 import akka.stream.impl.StreamLayout._
 import akka.stream.impl._
 import akka.stream.impl.fusing.{GraphInterpreterShell, GraphStageModule}
-import akka.stream.scaladsl.ModuleExtractor
 import akka.stream.stage.GraphStage
 import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
 import org.apache.gearpump.akkastream.graph.GraphPartitioner.Strategy
 import org.apache.gearpump.akkastream.graph.LocalGraph.LocalGraphMaterializer
 import org.apache.gearpump.akkastream.graph.RemoteGraph.RemoteGraphMaterializer
 import org.apache.gearpump.akkastream.graph._
-import org.apache.gearpump.akkastream.util.MaterializedValueOps
+import org.apache.gearpump.util.{Graph => GGraph}
 
 import scala.collection.mutable
 import scala.concurrent.{ExecutionContextExecutor, Promise}
@@ -137,7 +136,7 @@ class GearpumpMaterializer(override val system: ActorSystem,
 
   override def logger: LoggingAdapter = Logging.getLogger(system, this)
 
-  override def isShutdown: Boolean = system.isTerminated
+  override def isShutdown: Boolean = system.whenTerminated.isCompleted
 
   override def effectiveSettings(opAttr: Attributes): ActorMaterializerSettings = {
     import ActorAttributes._
@@ -177,7 +176,6 @@ class GearpumpMaterializer(override val system: ActorSystem,
       Nil)
 
     val info = Fusing.aggressive(runnableGraph).module.info
-    import _root_.org.apache.gearpump.util.{Graph => GGraph}
     val graph = GGraph.empty[Module, Edge]
 
     info.allModules.foreach(module => {
@@ -204,33 +202,7 @@ class GearpumpMaterializer(override val system: ActorSystem,
     })
 
     if(Debug) {
-      val iterator = graph.topologicalOrderIterator
-      while (iterator.hasNext) {
-        val module = iterator.next()
-        // scalastyle:off println
-        module match {
-          case graphStageModule: GraphStageModule =>
-            graphStageModule.stage match {
-              case symbolicGraphStage: SymbolicGraphStage[_, _, _] =>
-                val symbolicName = symbolicGraphStage.symbolicStage.getClass.getSimpleName
-                println(
-                  s"${module.getClass.getSimpleName}(${symbolicName})"
-                )
-              case graphStage: GraphStage[_] =>
-                val name = graphStage.getClass.getSimpleName
-                println(
-                  s"${module.getClass.getSimpleName}(${name})"
-                )
-              case other =>
-                println(
-                  s"${module.getClass.getSimpleName}(${other.getClass.getSimpleName})"
-                )
-            }
-          case _ =>
-            println(module.getClass.getSimpleName)
-        }
-        // scalastyle:on println
-      }
+      printGraph(graph)
     }
 
     val subGraphs = GraphPartitioner(strategy).partition(graph)
@@ -266,6 +238,36 @@ class GearpumpMaterializer(override val system: ActorSystem,
     rt.getOrElse(null).asInstanceOf[Mat]
   }
 
+  private def printGraph(graph: GGraph[Module, Edge]): Unit = {
+    val iterator = graph.topologicalOrderIterator
+    while (iterator.hasNext) {
+      val module = iterator.next()
+      // scalastyle:off println
+      module match {
+        case graphStageModule: GraphStageModule =>
+          graphStageModule.stage match {
+            case symbolicGraphStage: SymbolicGraphStage[_, _, _] =>
+              val symbolicName = symbolicGraphStage.symbolicStage.getClass.getSimpleName
+              println(
+                s"${module.getClass.getSimpleName}(${symbolicName})"
+              )
+            case graphStage: GraphStage[_] =>
+              val name = graphStage.getClass.getSimpleName
+              println(
+                s"${module.getClass.getSimpleName}(${name})"
+              )
+            case other =>
+              println(
+                s"${module.getClass.getSimpleName}(${other.getClass.getSimpleName})"
+              )
+          }
+        case _ =>
+          println(module.getClass.getSimpleName)
+      }
+      // scalastyle:on println
+    }
+  }
+
   override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat],
       subflowFuser: GraphInterpreterShell => ActorRef): Mat = {
     materialize(runnableGraph)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f1bec670/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala
index 2ce4e19..40cd556 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala
@@ -21,6 +21,7 @@ package org.apache.gearpump.akkastream.example
 import akka.actor.{Actor, ActorSystem, Props}
 import akka.stream.scaladsl.{Sink, Source}
 import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.graph.GraphPartitioner
 import org.apache.gearpump.cluster.main.ArgumentsParser
 import org.apache.gearpump.util.AkkaApp
 
@@ -37,7 +38,7 @@ object Test extends AkkaApp with ArgumentsParser {
   // scalastyle:off println
   override def main(akkaConf: Config, args: Array[String]): Unit = {
     implicit val system = ActorSystem("Test", akkaConf)
-    implicit val materializer = GearpumpMaterializer()
+    implicit val materializer = GearpumpMaterializer(GraphPartitioner.AllRemoteStrategy)
 
     val echo = system.actorOf(Props(new Echo()))
     val sink = Sink.actorRef(echo, "COMPLETE")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f1bec670/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala
index 7e2211d..830f278 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala
@@ -81,13 +81,11 @@ object WikipediaApp extends ArgumentsParser with AkkaApp {
       }
     )
 
-    g.run().onComplete { x =>
-      x match {
-        case Success((t, f)) => printResults(t, f)
-        // scalastyle:off println
-        case Failure(tr) => println("Something went wrong")
-        // scalastyle:on println
-      }
+    g.run().onComplete {
+      case Success((t, f)) => printResults(t, f)
+      // scalastyle:off println
+      case Failure(tr) => println("Something went wrong")
+      // scalastyle:on println
     }
     Await.result(system.whenTerminated, 60.minutes)
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f1bec670/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
index f3f8094..936ac29 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
@@ -28,14 +28,16 @@ import akka.stream.impl.{HeadOptionStage, Stages, Throttle}
 import akka.stream.scaladsl._
 import akka.stream.stage.AbstractStage.PushPullGraphStageWithMaterializedValue
 import akka.stream.stage.GraphStage
-import akka.stream.{FanInShape, FanOutShape}
 import org.apache.gearpump.akkastream.GearAttributes
 import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
 import org.apache.gearpump.akkastream.module._
 import org.apache.gearpump.akkastream.task._
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.dsl.StreamApp
-import org.apache.gearpump.streaming.dsl.op._
+import org.apache.gearpump.streaming.dsl.plan._
+import org.apache.gearpump.streaming.dsl.plan.functions.FlatMapFunction
+import org.apache.gearpump.streaming.dsl.window.api.CountWindow
+import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
 import org.apache.gearpump.streaming.{ProcessorId, StreamApplication}
 import org.apache.gearpump.util.Graph
 import org.slf4j.LoggerFactory
@@ -96,14 +98,14 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) {
       vertex.shape.inlets.flatMap { inlet =>
         graph.incomingEdgesOf(vertex).find(
           _._2.to == inlet).map(_._1
-        ).flatMap(processorIds.get(_))
+        ).flatMap(processorIds.get)
       }.toList
     }
     def outProcessors(vertex: Module): List[ProcessorId] = {
       vertex.shape.outlets.flatMap { outlet =>
         graph.outgoingEdgesOf(vertex).find(
           _._2.from == outlet).map(_._3
-        ).flatMap(processorIds.get(_))
+        ).flatMap(processorIds.get)
       }.toList
     }
     processorIds.get(vertex).map(processorId => {
@@ -165,6 +167,8 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) {
           reduceOp(reduce.f, conf)
         case graphStage: GraphStageModule =>
           translateGraphStageWithMaterializedValue(graphStage, parallelism, conf)
+        case _ =>
+          null
       }
       if (op == null) {
         throw new UnsupportedOperationException(
@@ -174,12 +178,11 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) {
       op
     }.mapEdge[OpEdge] { (n1, edge, n2) =>
       n2 match {
-        case master: MasterOp =>
-          Shuffle
-        case slave: SlaveOp[_] if n1.isInstanceOf[ProcessorOp[_]] =>
-          Shuffle
-        case slave: SlaveOp[_] =>
+        case chainableOp: ChainableOp[_, _]
+          if !n1.isInstanceOf[ProcessorOp[_]] && !n2.isInstanceOf[ProcessorOp[_]] =>
           Direct
+        case _ =>
+          Shuffle
       }
     }
     (opGraph, matValues.toMap)
@@ -237,7 +240,8 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) {
           withValue(FoldTask.AGGREGATOR, fold.f)
         ProcessorOp(classOf[FoldTask[_, _]], parallelism, foldConf, "fold")
       case groupBy: GroupBy[_, _] =>
-        GroupByOp(groupBy.keyFor, groupBy.maxSubstreams, "groupBy", conf)
+        GroupByOp(GroupAlsoByWindow(groupBy.keyFor, CountWindow.apply(1).accumulating),
+          groupBy.maxSubstreams, "groupBy", conf)
       case groupedWithin: GroupedWithin[_] =>
         val diConf = conf.withValue[FiniteDuration](GroupedWithinTask.TIME_WINDOW, groupedWithin.d).
           withInt(GroupedWithinTask.BATCH_SIZE, groupedWithin.n)
@@ -318,11 +322,11 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) {
         // TODO
         null
       case unzip: Unzip[_, _] =>
-        ProcessorOp(classOf[Unzip2Task[_, _, _]],
-          parallelism,
-          conf.withValue(
-            Unzip2Task.UNZIP2_FUNCTION, Unzip2Task.UnZipFunction(unzip.unzipper)
-          ), "unzip")
+//        ProcessorOp(classOf[Unzip2Task[_, _, _]], parallelism,
+//          conf.withValue(
+//            Unzip2Task.UNZIP2_FUNCTION, Unzip2Task.UnZipFunction(unzip.unzipper)), "unzip")
+        // TODO
+        null
       case zip: Zip[_, _] =>
         zipWithOp(zip.zipper, conf)
       case zipWith2: ZipWith2[_, _, _] =>
@@ -474,10 +478,10 @@ object RemoteMaterializerImpl {
 
   def flatMapOp[In, Out](fun: In => TraversableOnce[Out], description: String,
       conf: UserConfig): Op = {
-    FlatMapOp(fun, description, conf)
+    ChainableOp(new FlatMapFunction[In, Out](fun, description), conf)
   }
 
-  def conflatOp[In, Out](seed: In => Out, aggregate: (Out, In) => Out,
+  def conflateOp[In, Out](seed: In => Out, aggregate: (Out, In) => Out,
     conf: UserConfig): Op = {
     var agg = None: Option[Out]
     val flatMap = {elem: In =>
@@ -489,7 +493,7 @@ object RemoteMaterializerImpl {
       }
       List(agg.get)
     }
-    flatMapOp (flatMap, "conflat", conf)
+    flatMapOp (flatMap, "conflate", conf)
   }
 
   def foldOp[In, Out](zero: Out, fold: (Out, In) => Out, conf: UserConfig): Op = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f1bec670/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala
index 85b1d5e..80619ef 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala
@@ -56,7 +56,7 @@ object GearSource{
    *
    */
   def from[OUT](source: DataSource): Source[OUT, Unit] = {
-    val taskSource = new Source[OUT, Unit](new SourceTaskModule(source, UserConfig.empty))
+    val taskSource = new Source[OUT, Unit](SourceTaskModule(source, UserConfig.empty))
     taskSource
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f1bec670/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala
index 99f1b55..7dd91fc 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala
@@ -40,7 +40,7 @@ class Unzip2Task[In, A1, A2](context: TaskContext, userConf : UserConfig)
 }
 
 object Unzip2Task {
-  case class UnZipFunction[In, A1, A2](val unzip: In => (A1, A2)) extends Serializable
+  case class UnZipFunction[In, A1, A2](unzip: In => (A1, A2)) extends Serializable
 
   val UNZIP2_FUNCTION = "org.apache.gearpump.akkastream.task.unzip2.function"
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f1bec670/project/Build.scala
----------------------------------------------------------------------
diff --git a/project/Build.scala b/project/Build.scala
index f1e0443..a1e6ca5 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -35,8 +35,7 @@ object Build extends sbt.Build {
 
   val copySharedSourceFiles = TaskKey[Unit]("copied shared services source code")
 
-  val akkaVersion = "2.4.10"
-  val akkaStreamVersion = "2.4-SNAPSHOT"
+  val akkaVersion = "2.4.11"
   val apacheRepo = "https://repository.apache.org/"
   val hadoopVersion = "2.6.0"
   val hbaseVersion = "1.0.0"
@@ -148,10 +147,8 @@ object Build extends sbt.Build {
       "commons-logging" % "commons-logging" % commonsLoggingVersion,
       "com.typesafe.akka" %% "akka-distributed-data-experimental" % akkaVersion
         exclude("com.typesafe.akka", "akka-stream_2.11"),
-      "com.typesafe.akka" %% "akka-stream" % akkaStreamVersion,
       "org.apache.hadoop" % "hadoop-common" % hadoopVersion % "provided"
-    ),
-    dependencyOverrides += "com.typesafe.akka" %% "akka-stream" % akkaStreamVersion
+    )
   )
 
   val coreDependencies = Seq(
@@ -353,7 +350,6 @@ object Build extends sbt.Build {
       "com.github.scribejava" % "scribejava-apis" % "2.4.0",
       "com.ning" % "async-http-client" % "1.9.33",
       "org.webjars" % "angularjs" % "1.4.9",
-      "org.apache.hadoop" % "hadoop-common" % hadoopVersion,
 
       // angular 1.5 breaks ui-select, but we need ng-touch 1.5
       "org.webjars.npm" % "angular-touch" % "1.5.0",
@@ -417,12 +413,12 @@ object Build extends sbt.Build {
     settings = commonSettings ++ noPublish ++
       Seq(
         libraryDependencies ++= Seq(
-          "com.typesafe.akka" %% "akka-stream" % akkaStreamVersion,
+          "com.typesafe.akka" %% "akka-stream" % akkaVersion,
+          "org.apache.hadoop" % "hadoop-common" % hadoopVersion,
           "org.json4s" %% "json4s-jackson" % "3.2.11",
           "org.scalatest" %% "scalatest" % scalaTestVersion % "test"
-        ),
-        dependencyOverrides += "com.typesafe.akka" %% "akka-stream" % akkaStreamVersion
-      )) 
+        )
+      ))
       .dependsOn (services % "test->test; compile->compile", daemon % "test->test; compile->compile")
       .disablePlugins(sbtassembly.AssemblyPlugin)
 
@@ -436,7 +432,7 @@ object Build extends sbt.Build {
         ),
         mainClass in(Compile, packageBin) := Some("org.apache.gearpump.example.Test")
       ))
-    .dependsOn(streaming % "test->test; provided", daemon % "test->test; provided")
+      .dependsOn(streaming % "test->test; provided", daemon % "test->test; provided")
 
   lazy val storm = Project(
     id = "gearpump-experiments-storm",

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f1bec670/project/Pack.scala
----------------------------------------------------------------------
diff --git a/project/Pack.scala b/project/Pack.scala
index 47d3064..1c87653 100644
--- a/project/Pack.scala
+++ b/project/Pack.scala
@@ -69,8 +69,7 @@ object Pack extends sbt.Build {
           "worker" -> "org.apache.gearpump.cluster.main.Worker",
           "services" -> "org.apache.gearpump.services.main.Services",
           "yarnclient" -> "org.apache.gearpump.experiments.yarn.client.Client",
-          "storm" -> "org.apache.gearpump.experiments.storm.StormRunner",
-          "akkastream" -> "org.apache.gearpump.akkastream.example.Test11"
+          "storm" -> "org.apache.gearpump.experiments.storm.StormRunner"
         ),
         packJvmOpts := Map(
           "gear" -> Seq("-Djava.net.preferIPv4Stack=true", "-Dgearpump.home=${PROG_HOME}"),
@@ -110,13 +109,7 @@ object Pack extends sbt.Build {
           "storm" -> Seq(
             "-server",
             "-Djava.net.preferIPv4Stack=true",
-            "-Dgearpump.home=${PROG_HOME}"),
-
-          "akkastream" -> Seq(
-            "-server",
-            "-Djava.net.preferIPv4Stack=true",
-            "-Dgearpump.home=${PROG_HOME}",
-            "-Djava.rmi.server.hostname=localhost")
+            "-Dgearpump.home=${PROG_HOME}")
         ),
         packLibDir := Map(
           "lib" -> new ProjectsToPack(core.id, streaming.id),
@@ -148,14 +141,13 @@ object Pack extends sbt.Build {
           "worker" -> daemonClassPath,
           "services" -> serviceClassPath,
           "yarnclient" -> yarnClassPath,
-          "storm" -> stormClassPath,
-          "akkstream" -> daemonClassPath
+          "storm" -> stormClassPath
         ),
 
         packArchivePrefix := projectName + "-" + scalaBinaryVersion.value,
         packArchiveExcludes := Seq("integrationtest")
 
       )
-  ).dependsOn(core, streaming, services, yarn, storm, akkastream).
+  ).dependsOn(core, streaming, services, yarn, storm).
     disablePlugins(sbtassembly.AssemblyPlugin)
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f1bec670/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
index a6588a1..66ec873 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
@@ -123,7 +123,7 @@ object LifeTime {
  */
 class StreamApplication(
     override val name: String, val inputUserConfig: UserConfig,
-    dag: Graph[ProcessorDescription, PartitionerDescription])
+    val dag: Graph[ProcessorDescription, PartitionerDescription])
   extends Application {
 
   require(!dag.hasDuplicatedEdge(), "Graph should not have duplicated edges")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f1bec670/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
index 744976b..b2c5506 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
@@ -25,7 +25,8 @@ import org.apache.gearpump.streaming.Processor.DefaultProcessor
 import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
 import org.apache.gearpump.streaming.{Constants, Processor}
 import org.apache.gearpump.streaming.dsl.task.TransformTask
-import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
+import org.apache.gearpump.streaming.dsl.window.api.{CountWindow, GroupByFn}
+import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
 import org.apache.gearpump.streaming.sink.{DataSink, DataSinkProcessor}
 import org.apache.gearpump.streaming.source.{DataSource, DataSourceTask}
 import org.apache.gearpump.streaming.task.Task
@@ -124,11 +125,11 @@ case class DataSinkOp(
  * to another Op to be used
  */
 case class ChainableOp[IN, OUT](
-    fn: SingleInputFunction[IN, OUT]) extends Op {
+    fn: SingleInputFunction[IN, OUT],
+    userConfig: UserConfig = UserConfig.empty) extends Op {
 
   override def description: String = fn.description
 
-  override def userConfig: UserConfig = UserConfig.empty
 
   override def chain(other: Op)(implicit system: ActorSystem): Op = {
     other match {
@@ -141,7 +142,17 @@ case class ChainableOp[IN, OUT](
   }
 
   override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
-    throw new UnsupportedOperationException("ChainedOp cannot be translated to Processor")
+    Processor[TransformTask[Any, Any]](1, description,
+      userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, fn))
+  }
+}
+
+object GroupByOp {
+
+  def apply[IN, GROUP](groupBy: IN => GROUP, parallelism: Int,
+      description: String, userConfig: UserConfig): Op = {
+    GroupByOp(GroupAlsoByWindow(groupBy, CountWindow.apply(1).accumulating), parallelism,
+      description, userConfig)
   }
 }