You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/10/12 06:31:03 UTC
incubator-gearpump git commit: update akkastream against latests
Graph DSL
Repository: incubator-gearpump
Updated Branches:
refs/heads/akka-streams bc3940352 -> f1bec6709
update akkastream against latests Graph DSL
Author: manuzhang <ow...@gmail.com>
Closes #97 from manuzhang/akka-streams-new.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/f1bec670
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/f1bec670
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/f1bec670
Branch: refs/heads/akka-streams
Commit: f1bec67098b80f9bffe719f885f8d089344a5579
Parents: bc39403
Author: manuzhang <ow...@gmail.com>
Authored: Wed Oct 12 14:30:49 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Wed Oct 12 14:30:49 2016 +0800
----------------------------------------------------------------------
.../src/main/resources/geardefault.conf | 2 +-
.../akkastream/GearpumpMaterializer.scala | 64 ++++++++++----------
.../gearpump/akkastream/example/Test.scala | 3 +-
.../akkastream/example/WikipediaApp.scala | 12 ++--
.../materializer/RemoteMaterializerImpl.scala | 40 ++++++------
.../gearpump/akkastream/scaladsl/Api.scala | 2 +-
.../gearpump/akkastream/task/Unzip2Task.scala | 2 +-
project/Build.scala | 18 +++---
project/Pack.scala | 16 ++---
.../gearpump/streaming/StreamApplication.scala | 2 +-
.../apache/gearpump/streaming/dsl/plan/OP.scala | 19 ++++--
11 files changed, 92 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f1bec670/experiments/akkastream/src/main/resources/geardefault.conf
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/resources/geardefault.conf b/experiments/akkastream/src/main/resources/geardefault.conf
index e9da531..56524d4 100644
--- a/experiments/akkastream/src/main/resources/geardefault.conf
+++ b/experiments/akkastream/src/main/resources/geardefault.conf
@@ -4,5 +4,5 @@ gearpump.serializers {
"scala.collection.immutable.Map$Map2" = ""
}
akka {
- version = "2.4.10"
+ version = "2.4.11"
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f1bec670/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala
index 75dc95a..9ff701c 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala
@@ -29,14 +29,13 @@ import akka.stream.impl.Stages.SymbolicGraphStage
import akka.stream.impl.StreamLayout._
import akka.stream.impl._
import akka.stream.impl.fusing.{GraphInterpreterShell, GraphStageModule}
-import akka.stream.scaladsl.ModuleExtractor
import akka.stream.stage.GraphStage
import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
import org.apache.gearpump.akkastream.graph.GraphPartitioner.Strategy
import org.apache.gearpump.akkastream.graph.LocalGraph.LocalGraphMaterializer
import org.apache.gearpump.akkastream.graph.RemoteGraph.RemoteGraphMaterializer
import org.apache.gearpump.akkastream.graph._
-import org.apache.gearpump.akkastream.util.MaterializedValueOps
+import org.apache.gearpump.util.{Graph => GGraph}
import scala.collection.mutable
import scala.concurrent.{ExecutionContextExecutor, Promise}
@@ -137,7 +136,7 @@ class GearpumpMaterializer(override val system: ActorSystem,
override def logger: LoggingAdapter = Logging.getLogger(system, this)
- override def isShutdown: Boolean = system.isTerminated
+ override def isShutdown: Boolean = system.whenTerminated.isCompleted
override def effectiveSettings(opAttr: Attributes): ActorMaterializerSettings = {
import ActorAttributes._
@@ -177,7 +176,6 @@ class GearpumpMaterializer(override val system: ActorSystem,
Nil)
val info = Fusing.aggressive(runnableGraph).module.info
- import _root_.org.apache.gearpump.util.{Graph => GGraph}
val graph = GGraph.empty[Module, Edge]
info.allModules.foreach(module => {
@@ -204,33 +202,7 @@ class GearpumpMaterializer(override val system: ActorSystem,
})
if(Debug) {
- val iterator = graph.topologicalOrderIterator
- while (iterator.hasNext) {
- val module = iterator.next()
- // scalastyle:off println
- module match {
- case graphStageModule: GraphStageModule =>
- graphStageModule.stage match {
- case symbolicGraphStage: SymbolicGraphStage[_, _, _] =>
- val symbolicName = symbolicGraphStage.symbolicStage.getClass.getSimpleName
- println(
- s"${module.getClass.getSimpleName}(${symbolicName})"
- )
- case graphStage: GraphStage[_] =>
- val name = graphStage.getClass.getSimpleName
- println(
- s"${module.getClass.getSimpleName}(${name})"
- )
- case other =>
- println(
- s"${module.getClass.getSimpleName}(${other.getClass.getSimpleName})"
- )
- }
- case _ =>
- println(module.getClass.getSimpleName)
- }
- // scalastyle:on println
- }
+ printGraph(graph)
}
val subGraphs = GraphPartitioner(strategy).partition(graph)
@@ -266,6 +238,36 @@ class GearpumpMaterializer(override val system: ActorSystem,
rt.getOrElse(null).asInstanceOf[Mat]
}
+ private def printGraph(graph: GGraph[Module, Edge]): Unit = {
+ val iterator = graph.topologicalOrderIterator
+ while (iterator.hasNext) {
+ val module = iterator.next()
+ // scalastyle:off println
+ module match {
+ case graphStageModule: GraphStageModule =>
+ graphStageModule.stage match {
+ case symbolicGraphStage: SymbolicGraphStage[_, _, _] =>
+ val symbolicName = symbolicGraphStage.symbolicStage.getClass.getSimpleName
+ println(
+ s"${module.getClass.getSimpleName}(${symbolicName})"
+ )
+ case graphStage: GraphStage[_] =>
+ val name = graphStage.getClass.getSimpleName
+ println(
+ s"${module.getClass.getSimpleName}(${name})"
+ )
+ case other =>
+ println(
+ s"${module.getClass.getSimpleName}(${other.getClass.getSimpleName})"
+ )
+ }
+ case _ =>
+ println(module.getClass.getSimpleName)
+ }
+ // scalastyle:on println
+ }
+ }
+
override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat],
subflowFuser: GraphInterpreterShell => ActorRef): Mat = {
materialize(runnableGraph)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f1bec670/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala
index 2ce4e19..40cd556 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala
@@ -21,6 +21,7 @@ package org.apache.gearpump.akkastream.example
import akka.actor.{Actor, ActorSystem, Props}
import akka.stream.scaladsl.{Sink, Source}
import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.graph.GraphPartitioner
import org.apache.gearpump.cluster.main.ArgumentsParser
import org.apache.gearpump.util.AkkaApp
@@ -37,7 +38,7 @@ object Test extends AkkaApp with ArgumentsParser {
// scalastyle:off println
override def main(akkaConf: Config, args: Array[String]): Unit = {
implicit val system = ActorSystem("Test", akkaConf)
- implicit val materializer = GearpumpMaterializer()
+ implicit val materializer = GearpumpMaterializer(GraphPartitioner.AllRemoteStrategy)
val echo = system.actorOf(Props(new Echo()))
val sink = Sink.actorRef(echo, "COMPLETE")
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f1bec670/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala
index 7e2211d..830f278 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala
@@ -81,13 +81,11 @@ object WikipediaApp extends ArgumentsParser with AkkaApp {
}
)
- g.run().onComplete { x =>
- x match {
- case Success((t, f)) => printResults(t, f)
- // scalastyle:off println
- case Failure(tr) => println("Something went wrong")
- // scalastyle:on println
- }
+ g.run().onComplete {
+ case Success((t, f)) => printResults(t, f)
+ // scalastyle:off println
+ case Failure(tr) => println("Something went wrong")
+ // scalastyle:on println
}
Await.result(system.whenTerminated, 60.minutes)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f1bec670/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
index f3f8094..936ac29 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
@@ -28,14 +28,16 @@ import akka.stream.impl.{HeadOptionStage, Stages, Throttle}
import akka.stream.scaladsl._
import akka.stream.stage.AbstractStage.PushPullGraphStageWithMaterializedValue
import akka.stream.stage.GraphStage
-import akka.stream.{FanInShape, FanOutShape}
import org.apache.gearpump.akkastream.GearAttributes
import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
import org.apache.gearpump.akkastream.module._
import org.apache.gearpump.akkastream.task._
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.dsl.StreamApp
-import org.apache.gearpump.streaming.dsl.op._
+import org.apache.gearpump.streaming.dsl.plan._
+import org.apache.gearpump.streaming.dsl.plan.functions.FlatMapFunction
+import org.apache.gearpump.streaming.dsl.window.api.CountWindow
+import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
import org.apache.gearpump.streaming.{ProcessorId, StreamApplication}
import org.apache.gearpump.util.Graph
import org.slf4j.LoggerFactory
@@ -96,14 +98,14 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) {
vertex.shape.inlets.flatMap { inlet =>
graph.incomingEdgesOf(vertex).find(
_._2.to == inlet).map(_._1
- ).flatMap(processorIds.get(_))
+ ).flatMap(processorIds.get)
}.toList
}
def outProcessors(vertex: Module): List[ProcessorId] = {
vertex.shape.outlets.flatMap { outlet =>
graph.outgoingEdgesOf(vertex).find(
_._2.from == outlet).map(_._3
- ).flatMap(processorIds.get(_))
+ ).flatMap(processorIds.get)
}.toList
}
processorIds.get(vertex).map(processorId => {
@@ -165,6 +167,8 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) {
reduceOp(reduce.f, conf)
case graphStage: GraphStageModule =>
translateGraphStageWithMaterializedValue(graphStage, parallelism, conf)
+ case _ =>
+ null
}
if (op == null) {
throw new UnsupportedOperationException(
@@ -174,12 +178,11 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) {
op
}.mapEdge[OpEdge] { (n1, edge, n2) =>
n2 match {
- case master: MasterOp =>
- Shuffle
- case slave: SlaveOp[_] if n1.isInstanceOf[ProcessorOp[_]] =>
- Shuffle
- case slave: SlaveOp[_] =>
+ case chainableOp: ChainableOp[_, _]
+ if !n1.isInstanceOf[ProcessorOp[_]] && !n2.isInstanceOf[ProcessorOp[_]] =>
Direct
+ case _ =>
+ Shuffle
}
}
(opGraph, matValues.toMap)
@@ -237,7 +240,8 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) {
withValue(FoldTask.AGGREGATOR, fold.f)
ProcessorOp(classOf[FoldTask[_, _]], parallelism, foldConf, "fold")
case groupBy: GroupBy[_, _] =>
- GroupByOp(groupBy.keyFor, groupBy.maxSubstreams, "groupBy", conf)
+ GroupByOp(GroupAlsoByWindow(groupBy.keyFor, CountWindow.apply(1).accumulating),
+ groupBy.maxSubstreams, "groupBy", conf)
case groupedWithin: GroupedWithin[_] =>
val diConf = conf.withValue[FiniteDuration](GroupedWithinTask.TIME_WINDOW, groupedWithin.d).
withInt(GroupedWithinTask.BATCH_SIZE, groupedWithin.n)
@@ -318,11 +322,11 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) {
// TODO
null
case unzip: Unzip[_, _] =>
- ProcessorOp(classOf[Unzip2Task[_, _, _]],
- parallelism,
- conf.withValue(
- Unzip2Task.UNZIP2_FUNCTION, Unzip2Task.UnZipFunction(unzip.unzipper)
- ), "unzip")
+// ProcessorOp(classOf[Unzip2Task[_, _, _]], parallelism,
+// conf.withValue(
+// Unzip2Task.UNZIP2_FUNCTION, Unzip2Task.UnZipFunction(unzip.unzipper)), "unzip")
+ // TODO
+ null
case zip: Zip[_, _] =>
zipWithOp(zip.zipper, conf)
case zipWith2: ZipWith2[_, _, _] =>
@@ -474,10 +478,10 @@ object RemoteMaterializerImpl {
def flatMapOp[In, Out](fun: In => TraversableOnce[Out], description: String,
conf: UserConfig): Op = {
- FlatMapOp(fun, description, conf)
+ ChainableOp(new FlatMapFunction[In, Out](fun, description), conf)
}
- def conflatOp[In, Out](seed: In => Out, aggregate: (Out, In) => Out,
+ def conflateOp[In, Out](seed: In => Out, aggregate: (Out, In) => Out,
conf: UserConfig): Op = {
var agg = None: Option[Out]
val flatMap = {elem: In =>
@@ -489,7 +493,7 @@ object RemoteMaterializerImpl {
}
List(agg.get)
}
- flatMapOp (flatMap, "conflat", conf)
+ flatMapOp (flatMap, "conflate", conf)
}
def foldOp[In, Out](zero: Out, fold: (Out, In) => Out, conf: UserConfig): Op = {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f1bec670/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala
index 85b1d5e..80619ef 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala
@@ -56,7 +56,7 @@ object GearSource{
*
*/
def from[OUT](source: DataSource): Source[OUT, Unit] = {
- val taskSource = new Source[OUT, Unit](new SourceTaskModule(source, UserConfig.empty))
+ val taskSource = new Source[OUT, Unit](SourceTaskModule(source, UserConfig.empty))
taskSource
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f1bec670/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala
index 99f1b55..7dd91fc 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala
@@ -40,7 +40,7 @@ class Unzip2Task[In, A1, A2](context: TaskContext, userConf : UserConfig)
}
object Unzip2Task {
- case class UnZipFunction[In, A1, A2](val unzip: In => (A1, A2)) extends Serializable
+ case class UnZipFunction[In, A1, A2](unzip: In => (A1, A2)) extends Serializable
val UNZIP2_FUNCTION = "org.apache.gearpump.akkastream.task.unzip2.function"
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f1bec670/project/Build.scala
----------------------------------------------------------------------
diff --git a/project/Build.scala b/project/Build.scala
index f1e0443..a1e6ca5 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -35,8 +35,7 @@ object Build extends sbt.Build {
val copySharedSourceFiles = TaskKey[Unit]("copied shared services source code")
- val akkaVersion = "2.4.10"
- val akkaStreamVersion = "2.4-SNAPSHOT"
+ val akkaVersion = "2.4.11"
val apacheRepo = "https://repository.apache.org/"
val hadoopVersion = "2.6.0"
val hbaseVersion = "1.0.0"
@@ -148,10 +147,8 @@ object Build extends sbt.Build {
"commons-logging" % "commons-logging" % commonsLoggingVersion,
"com.typesafe.akka" %% "akka-distributed-data-experimental" % akkaVersion
exclude("com.typesafe.akka", "akka-stream_2.11"),
- "com.typesafe.akka" %% "akka-stream" % akkaStreamVersion,
"org.apache.hadoop" % "hadoop-common" % hadoopVersion % "provided"
- ),
- dependencyOverrides += "com.typesafe.akka" %% "akka-stream" % akkaStreamVersion
+ )
)
val coreDependencies = Seq(
@@ -353,7 +350,6 @@ object Build extends sbt.Build {
"com.github.scribejava" % "scribejava-apis" % "2.4.0",
"com.ning" % "async-http-client" % "1.9.33",
"org.webjars" % "angularjs" % "1.4.9",
- "org.apache.hadoop" % "hadoop-common" % hadoopVersion,
// angular 1.5 breaks ui-select, but we need ng-touch 1.5
"org.webjars.npm" % "angular-touch" % "1.5.0",
@@ -417,12 +413,12 @@ object Build extends sbt.Build {
settings = commonSettings ++ noPublish ++
Seq(
libraryDependencies ++= Seq(
- "com.typesafe.akka" %% "akka-stream" % akkaStreamVersion,
+ "com.typesafe.akka" %% "akka-stream" % akkaVersion,
+ "org.apache.hadoop" % "hadoop-common" % hadoopVersion,
"org.json4s" %% "json4s-jackson" % "3.2.11",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test"
- ),
- dependencyOverrides += "com.typesafe.akka" %% "akka-stream" % akkaStreamVersion
- ))
+ )
+ ))
.dependsOn (services % "test->test; compile->compile", daemon % "test->test; compile->compile")
.disablePlugins(sbtassembly.AssemblyPlugin)
@@ -436,7 +432,7 @@ object Build extends sbt.Build {
),
mainClass in(Compile, packageBin) := Some("org.apache.gearpump.example.Test")
))
- .dependsOn(streaming % "test->test; provided", daemon % "test->test; provided")
+ .dependsOn(streaming % "test->test; provided", daemon % "test->test; provided")
lazy val storm = Project(
id = "gearpump-experiments-storm",
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f1bec670/project/Pack.scala
----------------------------------------------------------------------
diff --git a/project/Pack.scala b/project/Pack.scala
index 47d3064..1c87653 100644
--- a/project/Pack.scala
+++ b/project/Pack.scala
@@ -69,8 +69,7 @@ object Pack extends sbt.Build {
"worker" -> "org.apache.gearpump.cluster.main.Worker",
"services" -> "org.apache.gearpump.services.main.Services",
"yarnclient" -> "org.apache.gearpump.experiments.yarn.client.Client",
- "storm" -> "org.apache.gearpump.experiments.storm.StormRunner",
- "akkastream" -> "org.apache.gearpump.akkastream.example.Test11"
+ "storm" -> "org.apache.gearpump.experiments.storm.StormRunner"
),
packJvmOpts := Map(
"gear" -> Seq("-Djava.net.preferIPv4Stack=true", "-Dgearpump.home=${PROG_HOME}"),
@@ -110,13 +109,7 @@ object Pack extends sbt.Build {
"storm" -> Seq(
"-server",
"-Djava.net.preferIPv4Stack=true",
- "-Dgearpump.home=${PROG_HOME}"),
-
- "akkastream" -> Seq(
- "-server",
- "-Djava.net.preferIPv4Stack=true",
- "-Dgearpump.home=${PROG_HOME}",
- "-Djava.rmi.server.hostname=localhost")
+ "-Dgearpump.home=${PROG_HOME}")
),
packLibDir := Map(
"lib" -> new ProjectsToPack(core.id, streaming.id),
@@ -148,14 +141,13 @@ object Pack extends sbt.Build {
"worker" -> daemonClassPath,
"services" -> serviceClassPath,
"yarnclient" -> yarnClassPath,
- "storm" -> stormClassPath,
- "akkstream" -> daemonClassPath
+ "storm" -> stormClassPath
),
packArchivePrefix := projectName + "-" + scalaBinaryVersion.value,
packArchiveExcludes := Seq("integrationtest")
)
- ).dependsOn(core, streaming, services, yarn, storm, akkastream).
+ ).dependsOn(core, streaming, services, yarn, storm).
disablePlugins(sbtassembly.AssemblyPlugin)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f1bec670/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
index a6588a1..66ec873 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
@@ -123,7 +123,7 @@ object LifeTime {
*/
class StreamApplication(
override val name: String, val inputUserConfig: UserConfig,
- dag: Graph[ProcessorDescription, PartitionerDescription])
+ val dag: Graph[ProcessorDescription, PartitionerDescription])
extends Application {
require(!dag.hasDuplicatedEdge(), "Graph should not have duplicated edges")
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f1bec670/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
index 744976b..b2c5506 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
@@ -25,7 +25,8 @@ import org.apache.gearpump.streaming.Processor.DefaultProcessor
import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
import org.apache.gearpump.streaming.{Constants, Processor}
import org.apache.gearpump.streaming.dsl.task.TransformTask
-import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
+import org.apache.gearpump.streaming.dsl.window.api.{CountWindow, GroupByFn}
+import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
import org.apache.gearpump.streaming.sink.{DataSink, DataSinkProcessor}
import org.apache.gearpump.streaming.source.{DataSource, DataSourceTask}
import org.apache.gearpump.streaming.task.Task
@@ -124,11 +125,11 @@ case class DataSinkOp(
* to another Op to be used
*/
case class ChainableOp[IN, OUT](
- fn: SingleInputFunction[IN, OUT]) extends Op {
+ fn: SingleInputFunction[IN, OUT],
+ userConfig: UserConfig = UserConfig.empty) extends Op {
override def description: String = fn.description
- override def userConfig: UserConfig = UserConfig.empty
override def chain(other: Op)(implicit system: ActorSystem): Op = {
other match {
@@ -141,7 +142,17 @@ case class ChainableOp[IN, OUT](
}
override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
- throw new UnsupportedOperationException("ChainedOp cannot be translated to Processor")
+ Processor[TransformTask[Any, Any]](1, description,
+ userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, fn))
+ }
+}
+
+object GroupByOp {
+
+ def apply[IN, GROUP](groupBy: IN => GROUP, parallelism: Int,
+ description: String, userConfig: UserConfig): Op = {
+ GroupByOp(GroupAlsoByWindow(groupBy, CountWindow.apply(1).accumulating), parallelism,
+ description, userConfig)
}
}