You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/10/09 00:52:12 UTC
[2/2] incubator-gearpump git commit: [GEARPUMP-23] add window dsl
[GEARPUMP-23] add window dsl
The PR is opened for early review and the work is in progress with following todos.
- [x] basic window dsl support with `WindowedWordCount` example
- [x] improve `ReduceFunction` to not emit intermediate results
- [x] add unit tests
- [ ] add comments and update documentation
- [ ] support different types of computation (e.g. monoid which doesn't require input elements to be held in the window)
Author: manuzhang <ow...@gmail.com>
Closes #85 from manuzhang/window_dsl.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/66017ab7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/66017ab7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/66017ab7
Branch: refs/heads/master
Commit: 66017ab7bf5166ec312684f0e3e49e7219b4c24d
Parents: 5c4d60c
Author: manuzhang <ow...@gmail.com>
Authored: Sun Oct 9 08:52:00 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Sun Oct 9 08:52:00 2016 +0800
----------------------------------------------------------------------
.../wordcount/dsl/WindowedWordCount.scala | 87 +++++
.../apache/gearpump/streaming/Constants.scala | 1 +
.../gearpump/streaming/StreamApplication.scala | 2 +-
.../apache/gearpump/streaming/dsl/Stream.scala | 106 +++---
.../gearpump/streaming/dsl/StreamApp.scala | 34 +-
.../streaming/dsl/javaapi/JavaStream.scala | 22 +-
.../apache/gearpump/streaming/dsl/op/OP.scala | 109 ------
.../dsl/partitioner/GroupByPartitioner.scala | 49 +++
.../dsl/partitioner/GroupbyPartitioner.scala | 46 ---
.../apache/gearpump/streaming/dsl/plan/OP.scala | 214 ++++++++++++
.../streaming/dsl/plan/OpTranslator.scala | 222 -------------
.../gearpump/streaming/dsl/plan/Planner.scala | 65 ++--
.../plan/functions/SingleInputFunction.scala | 107 ++++++
.../streaming/dsl/task/CountTriggerTask.scala | 63 ++++
.../dsl/task/EventTimeTriggerTask.scala | 59 ++++
.../dsl/task/ProcessingTimeTriggerTask.scala | 82 +++++
.../streaming/dsl/task/TransformTask.scala | 47 +++
.../dsl/window/api/AccumulationMode.scala | 24 ++
.../streaming/dsl/window/api/GroupByFn.scala | 47 +++
.../streaming/dsl/window/api/Trigger.scala | 27 ++
.../streaming/dsl/window/api/Window.scala | 77 +++++
.../streaming/dsl/window/api/WindowFn.scala | 63 ++++
.../dsl/window/impl/ReduceFnRunner.scala | 29 ++
.../streaming/dsl/window/impl/Window.scala | 75 +++++
.../dsl/window/impl/WindowRunner.scala | 114 +++++++
.../streaming/source/DataSourceTask.scala | 15 +-
.../gearpump/streaming/task/TaskActor.scala | 4 +-
.../gearpump/streaming/dsl/StreamAppSpec.scala | 67 ++--
.../gearpump/streaming/dsl/StreamSpec.scala | 24 +-
.../partitioner/GroupByPartitionerSpec.scala | 23 +-
.../gearpump/streaming/dsl/plan/OpSpec.scala | 244 ++++++++++++++
.../streaming/dsl/plan/OpTranslatorSpec.scala | 148 ---------
.../streaming/dsl/plan/PlannerSpec.scala | 132 ++++++++
.../functions/SingleInputFunctionSpec.scala | 333 +++++++++++++++++++
.../dsl/task/CountTriggerTaskSpec.scala | 61 ++++
.../dsl/task/EventTimeTriggerTaskSpec.scala | 66 ++++
.../task/ProcessingTimeTriggerTaskSpec.scala | 69 ++++
37 files changed, 2246 insertions(+), 711 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
new file mode 100644
index 0000000..4f43fd4
--- /dev/null
+++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.wordcount.dsl
+
+import java.time.{Duration, Instant}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.streaming.dsl.{LoggerSink, StreamApp}
+import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, FixedWindow}
+import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.util.AkkaApp
+
+object WindowedWordCount extends AkkaApp with ArgumentsParser {
+
+ override val options: Array[(String, CLIOption[Any])] = Array.empty
+
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ val context = ClientContext(akkaConf)
+ val app = StreamApp("dsl", context)
+ app.source[String](new TimedDataSource).
+ // word => (word, count)
+ flatMap(line => line.split("[\\s]+")).map((_, 1)).
+ // fix window
+ window(FixedWindow.apply(Duration.ofMillis(5L))
+ .triggering(EventTimeTrigger)).
+ // (word, count1), (word, count2) => (word, count1 + count2)
+ groupBy(_._1).
+ sum.sink(new LoggerSink)
+
+ context.submit(app)
+ context.close()
+ }
+
+ private class TimedDataSource extends DataSource {
+
+ private var data = List(
+ Message("foo", 1L),
+ Message("bar", 2L),
+ Message("foo", 3L),
+ Message("foo", 5L),
+ Message("bar", 7L),
+ Message("bar", 8L)
+ )
+
+ private var watermark: Instant = Instant.ofEpochMilli(0)
+
+ override def read(): Message = {
+ if (data.nonEmpty) {
+ val msg = data.head
+ data = data.tail
+ watermark = Instant.ofEpochMilli(msg.timestamp)
+ msg
+ } else {
+ null
+ }
+ }
+
+ override def open(context: TaskContext, startTime: Instant): Unit = {}
+
+ override def close(): Unit = {}
+
+ override def getWatermark: Instant = {
+ if (data.isEmpty) {
+ watermark = watermark.plusMillis(1)
+ }
+ watermark
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
index cd33b50..f99a436 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
@@ -22,6 +22,7 @@ object Constants {
val GEARPUMP_STREAMING_OPERATOR = "gearpump.streaming.dsl.operator"
val GEARPUMP_STREAMING_SOURCE = "gearpump.streaming.source"
val GEARPUMP_STREAMING_GROUPBY_FUNCTION = "gearpump.streaming.dsl.groupby-function"
+ val GEARPUMP_STREAMING_WINDOW_FUNCTION = "gearpump.streaming.dsl.window-function"
val GEARPUMP_STREAMING_LOCALITIES = "gearpump.streaming.localities"
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
index 66ec873..a6588a1 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
@@ -123,7 +123,7 @@ object LifeTime {
*/
class StreamApplication(
override val name: String, val inputUserConfig: UserConfig,
- val dag: Graph[ProcessorDescription, PartitionerDescription])
+ dag: Graph[ProcessorDescription, PartitionerDescription])
extends Application {
require(!dag.hasDuplicatedEdge(), "Graph should not have duplicated edges")
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala
index 786d496..440a45e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala
@@ -20,7 +20,10 @@ package org.apache.gearpump.streaming.dsl
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.dsl.op._
+import org.apache.gearpump.streaming.dsl.plan._
+import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapFunction, ReduceFunction}
+import org.apache.gearpump.streaming.dsl.window.api._
+import org.apache.gearpump.streaming.dsl.window.impl._
import org.apache.gearpump.streaming.sink.DataSink
import org.apache.gearpump.streaming.task.{Task, TaskContext}
import org.apache.gearpump.util.Graph
@@ -35,12 +38,12 @@ class Stream[T](
/**
* converts a value[T] to a list of value[R]
*
- * @param fun FlatMap function
+ * @param fn FlatMap function
* @param description The description message for this operation
* @return A new stream with type [R]
*/
- def flatMap[R](fun: T => TraversableOnce[R], description: String = null): Stream[R] = {
- val flatMapOp = FlatMapOp(fun, Option(description).getOrElse("flatmap"))
+ def flatMap[R](fn: T => TraversableOnce[R], description: String = "flatMap"): Stream[R] = {
+ val flatMapOp = ChainableOp(new FlatMapFunction[T, R](fn, description))
graph.addVertex(flatMapOp)
graph.addEdge(thisNode, edge.getOrElse(Direct), flatMapOp)
new Stream[R](graph, flatMapOp)
@@ -49,36 +52,36 @@ class Stream[T](
/**
* Maps message of type T message of type R
*
- * @param fun Function
+ * @param fn Function
* @return A new stream with type [R]
*/
- def map[R](fun: T => R, description: String = null): Stream[R] = {
+ def map[R](fn: T => R, description: String = "map"): Stream[R] = {
this.flatMap({ data =>
- Option(fun(data))
- }, Option(description).getOrElse("map"))
+ Option(fn(data))
+ }, description)
}
/**
* Keeps records when fun(T) == true
*
- * @param fun the filter
+ * @param fn the filter
* @return a new stream after filter
*/
- def filter(fun: T => Boolean, description: String = null): Stream[T] = {
+ def filter(fn: T => Boolean, description: String = "filter"): Stream[T] = {
this.flatMap({ data =>
- if (fun(data)) Option(data) else None
- }, Option(description).getOrElse("filter"))
+ if (fn(data)) Option(data) else None
+ }, description)
}
/**
* Reduces operations.
*
- * @param fun reduction function
+ * @param fn reduction function
* @param description description message for this operator
* @return a new stream after reduction
*/
- def reduce(fun: (T, T) => T, description: String = null): Stream[T] = {
- val reduceOp = ReduceOp(fun, Option(description).getOrElse("reduce"))
+ def reduce(fn: (T, T) => T, description: String = "reduce"): Stream[T] = {
+ val reduceOp = ChainableOp(new ReduceFunction(fn, description))
graph.addVertex(reduceOp)
graph.addEdge(thisNode, edge.getOrElse(Direct), reduceOp)
new Stream(graph, reduceOp)
@@ -88,7 +91,10 @@ class Stream[T](
* Log to task log file
*/
def log(): Unit = {
- this.map(msg => LoggerFactory.getLogger("dsl").info(msg.toString), "log")
+ this.map(msg => {
+ LoggerFactory.getLogger("dsl").info(msg.toString)
+ msg
+ }, "log")
}
/**
@@ -97,8 +103,8 @@ class Stream[T](
* @param other the other stream
* @return the merged stream
*/
- def merge(other: Stream[T], description: String = null): Stream[T] = {
- val mergeOp = MergeOp(Option(description).getOrElse("merge"))
+ def merge(other: Stream[T], description: String = "merge"): Stream[T] = {
+ val mergeOp = MergeOp(description, UserConfig.empty)
graph.addVertex(mergeOp)
graph.addEdge(thisNode, edge.getOrElse(Direct), mergeOp)
graph.addEdge(other.thisNode, other.edge.getOrElse(Shuffle), mergeOp)
@@ -115,20 +121,29 @@ class Stream[T](
*
* For example,
* {{{
- * Stream[People].groupBy(_.gender).flatmap(..).filter.(..).reduce(..)
+ * Stream[People].groupBy(_.gender).flatMap(..).filter(..).reduce(..)
* }}}
*
- * @param fun Group by function
+ * @param fn Group by function
* @param parallelism Parallelism level
* @param description The description
* @return the grouped stream
*/
- def groupBy[Group](fun: T => Group, parallelism: Int = 1, description: String = null)
- : Stream[T] = {
- val groupOp = GroupByOp(fun, parallelism, Option(description).getOrElse("groupBy"))
- graph.addVertex(groupOp)
- graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp)
- new Stream[T](graph, groupOp)
+ def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1,
+ description: String = "groupBy"): Stream[T] = {
+ window(CountWindow.apply(1).accumulating)
+ .groupBy[GROUP](fn, parallelism, description)
+ }
+
+ /**
+ * Window function
+ *
+ * @param win window definition
+ * @param description window description
+ * @return [[WindowStream]] where groupBy could be applied
+ */
+ def window(win: Window, description: String = "window"): WindowStream[T] = {
+ new WindowStream[T](graph, edge, thisNode, win, description)
}
/**
@@ -140,15 +155,28 @@ class Stream[T](
*/
def process[R](
processor: Class[_ <: Task], parallelism: Int, conf: UserConfig = UserConfig.empty,
- description: String = null): Stream[R] = {
- val processorOp = ProcessorOp(processor, parallelism, conf,
- Option(description).getOrElse("process"))
+ description: String = "process"): Stream[R] = {
+ val processorOp = ProcessorOp(processor, parallelism, conf, description)
graph.addVertex(processorOp)
graph.addEdge(thisNode, edge.getOrElse(Shuffle), processorOp)
new Stream[R](graph, processorOp, Some(Shuffle))
}
}
+class WindowStream[T](graph: Graph[Op, OpEdge], edge: Option[OpEdge], thisNode: Op,
+ window: Window, winDesc: String) {
+
+ def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1,
+ description: String = "groupBy"): Stream[T] = {
+ val groupBy: GroupByFn[T, (GROUP, List[Bucket])] = GroupAlsoByWindow(fn, window)
+ val groupOp = GroupByOp[T, (GROUP, List[Bucket])](groupBy, parallelism,
+ s"$winDesc.$description")
+ graph.addVertex(groupOp)
+ graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp)
+ new Stream[T](graph, groupOp)
+ }
+}
+
class KVStream[K, V](stream: Stream[Tuple2[K, V]]) {
/**
* GroupBy key
@@ -192,30 +220,18 @@ object Stream {
}
implicit class Sink[T](stream: Stream[T]) extends java.io.Serializable {
- def sink[T](dataSink: DataSink, parallism: Int, conf: UserConfig, description: String)
- : Stream[T] = {
- implicit val sink = DataSinkOp[T](dataSink, parallism, conf,
- Some(description).getOrElse("traversable"))
+ def sink(dataSink: DataSink, parallelism: Int = 1,
+ conf: UserConfig = UserConfig.empty, description: String = "sink"): Stream[T] = {
+ implicit val sink = DataSinkOp(dataSink, parallelism, conf, description)
stream.graph.addVertex(sink)
stream.graph.addEdge(stream.thisNode, Shuffle, sink)
new Stream[T](stream.graph, sink)
}
-
- def sink[T](
- sink: Class[_ <: Task], parallism: Int, conf: UserConfig = UserConfig.empty,
- description: String = null): Stream[T] = {
- val sinkOp = ProcessorOp(sink, parallism, conf, Option(description).getOrElse("source"))
- stream.graph.addVertex(sinkOp)
- stream.graph.addEdge(stream.thisNode, Shuffle, sinkOp)
- new Stream[T](stream.graph, sinkOp)
- }
}
}
class LoggerSink[T] extends DataSink {
- var logger: Logger = null
-
- private var context: TaskContext = null
+ var logger: Logger = _
override def open(context: TaskContext): Unit = {
this.logger = context.logger
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
index d45737b..8116146 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
@@ -24,10 +24,9 @@ import akka.actor.ActorSystem
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.cluster.client.ClientContext
import org.apache.gearpump.streaming.StreamApplication
-import org.apache.gearpump.streaming.dsl.op.{DataSourceOp, Op, OpEdge, ProcessorOp}
-import org.apache.gearpump.streaming.dsl.plan.Planner
+import org.apache.gearpump.streaming.dsl.plan._
import org.apache.gearpump.streaming.source.DataSource
-import org.apache.gearpump.streaming.task.{Task, TaskContext}
+import org.apache.gearpump.streaming.task.TaskContext
import org.apache.gearpump.util.Graph
import org.apache.gearpump.Message
@@ -50,7 +49,8 @@ import scala.language.implicitConversions
* @param name name of app
*/
class StreamApp(
- val name: String, system: ActorSystem, userConfig: UserConfig, val graph: Graph[Op, OpEdge]) {
+ name: String, system: ActorSystem, userConfig: UserConfig,
+ private val graph: Graph[Op, OpEdge]) {
def this(name: String, system: ActorSystem, userConfig: UserConfig) = {
this(name, system, userConfig, Graph.empty[Op, OpEdge])
@@ -76,34 +76,16 @@ object StreamApp {
implicit class Source(app: StreamApp) extends java.io.Serializable {
- def source[T](dataSource: DataSource, parallelism: Int): Stream[T] = {
- source(dataSource, parallelism, UserConfig.empty)
- }
-
- def source[T](dataSource: DataSource, parallelism: Int, description: String): Stream[T] = {
- source(dataSource, parallelism, UserConfig.empty, description)
- }
-
- def source[T](dataSource: DataSource, parallelism: Int, conf: UserConfig): Stream[T] = {
- source(dataSource, parallelism, conf, description = null)
- }
-
- def source[T](dataSource: DataSource, parallelism: Int, conf: UserConfig, description: String)
- : Stream[T] = {
+ def source[T](dataSource: DataSource, parallelism: Int = 1,
+ conf: UserConfig = UserConfig.empty, description: String = "source"): Stream[T] = {
implicit val sourceOp = DataSourceOp(dataSource, parallelism, conf, description)
app.graph.addVertex(sourceOp)
new Stream[T](app.graph, sourceOp)
}
+
def source[T](seq: Seq[T], parallelism: Int, description: String): Stream[T] = {
this.source(new CollectionDataSource[T](seq), parallelism, UserConfig.empty, description)
}
-
- def source[T](source: Class[_ <: Task], parallelism: Int, conf: UserConfig, description: String)
- : Stream[T] = {
- val sourceOp = ProcessorOp(source, parallelism, conf, Option(description).getOrElse("source"))
- app.graph.addVertex(sourceOp)
- new Stream[T](app.graph, sourceOp)
- }
}
}
@@ -115,7 +97,7 @@ class CollectionDataSource[T](seq: Seq[T]) extends DataSource {
override def read(): Message = {
if (iterator.hasNext) {
- Message(iterator.next())
+ Message(iterator.next(), Instant.now().toEpochMilli)
} else {
null
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
index 6eff20c..3003b98 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
@@ -19,9 +19,9 @@
package org.apache.gearpump.streaming.dsl.javaapi
import scala.collection.JavaConverters._
-
import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.dsl.Stream
+import org.apache.gearpump.streaming.dsl.window.api.Window
+import org.apache.gearpump.streaming.dsl.{Stream, WindowStream}
import org.apache.gearpump.streaming.javaapi.dsl.functions._
import org.apache.gearpump.streaming.task.Task
@@ -63,9 +63,13 @@ class JavaStream[T](val stream: Stream[T]) {
* Group by a stream and turns it to a list of sub-streams. Operations chained after
* groupBy applies to sub-streams.
*/
- def groupBy[Group](fn: GroupByFunction[T, Group], parallelism: Int, description: String)
- : JavaStream[T] = {
- new JavaStream[T](stream.groupBy({t: T => fn(t)}, parallelism, description))
+ def groupBy[GROUP](fn: GroupByFunction[T, GROUP],
+ parallelism: Int, description: String): JavaStream[T] = {
+ new JavaStream[T](stream.groupBy((t: T) => fn, parallelism, description))
+ }
+
+ def window(win: Window, description: String): JavaWindowStream[T] = {
+ new JavaWindowStream[T](stream.window(win, description))
}
/** Add a low level Processor to process messages */
@@ -75,3 +79,11 @@ class JavaStream[T](val stream: Stream[T]) {
new JavaStream[R](stream.process(processor, parallelism, conf, description))
}
}
+
+class JavaWindowStream[T](stream: WindowStream[T]) {
+
+ def groupBy[GROUP](fn: GroupByFunction[T, GROUP], parallelism: Int,
+ description: String): JavaStream[T] = {
+ new JavaStream[T](stream.groupBy((t: T) => fn, parallelism, description))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala
deleted file mode 100644
index 49d9dec..0000000
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.dsl.op
-
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.sink.DataSink
-import org.apache.gearpump.streaming.source.DataSource
-import org.apache.gearpump.streaming.task.Task
-
-/**
- * Operators for the DSL
- */
-sealed trait Op {
- def description: String
- def conf: UserConfig
-}
-
-/**
- * When translated to running DAG, SlaveOP can be attach to MasterOP or other SlaveOP
- * "Attach" means running in same Actor.
- */
-trait SlaveOp[T] extends Op
-
-case class FlatMapOp[T, R](
- fun: (T) => TraversableOnce[R], description: String, conf: UserConfig = UserConfig.empty)
- extends SlaveOp[T]
-
-case class ReduceOp[T](fun: (T, T) => T, description: String, conf: UserConfig = UserConfig.empty)
- extends SlaveOp[T]
-
-trait MasterOp extends Op
-
-trait ParameterizedOp[T] extends MasterOp
-
-case class MergeOp(description: String, override val conf: UserConfig = UserConfig.empty)
- extends MasterOp
-
-case class GroupByOp[T, R](
- fun: T => R, parallelism: Int, description: String,
- override val conf: UserConfig = UserConfig.empty)
- extends ParameterizedOp[T]
-
-case class ProcessorOp[T <: Task](
- processor: Class[T], parallelism: Int, conf: UserConfig, description: String)
- extends ParameterizedOp[T]
-
-case class DataSourceOp[T](
- dataSource: DataSource, parallelism: Int, conf: UserConfig, description: String)
- extends ParameterizedOp[T]
-
-case class DataSinkOp[T](
- dataSink: DataSink, parallelism: Int, conf: UserConfig, description: String)
- extends ParameterizedOp[T]
-
-/**
- * Contains operators which can be chained to single one.
- *
- * For example, flatmap().map().reduce() can be chained to single operator as
- * no data shuffling is required.
- * @param ops list of operations
- */
-case class OpChain(ops: List[Op]) extends Op {
- def head: Op = ops.head
- def last: Op = ops.last
-
- def description: String = null
-
- override def conf: UserConfig = {
- // The head's conf has priority
- ops.reverse.foldLeft(UserConfig.empty) { (conf, op) =>
- conf.withConfig(op.conf)
- }
- }
-}
-
-trait OpEdge
-
-/**
- * The upstream OP and downstream OP doesn't require network data shuffle.
- *
- * For example, map, flatmap operation doesn't require network shuffle, we can use Direct
- * to represent the relation with upstream operators.
- */
-case object Direct extends OpEdge
-
-/**
- * The upstream OP and downstream OP DOES require network data shuffle.
- *
- * For example, map, flatmap operation doesn't require network shuffle, we can use Direct
- * to represent the relation with upstream operators.
- */
-case object Shuffle extends OpEdge
-
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
new file mode 100644
index 0000000..2ec881b
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.partitioner
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.partitioner.UnicastPartitioner
+import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
+
+/**
+ * Partition messages by applying group by function first.
+ *
+ * For example:
+ * {{{
+ * case class People(name: String, gender: String)
+ *
+ * object Test{
+ *
+ * val groupBy: (People => String) = people => people.gender
+ * val partitioner = GroupByPartitioner(groupBy)
+ * }
+ * }}}
+ *
+ * @param fn First apply message with groupBy function, then pick the hashCode of the output
+ * to do the partitioning. You must define hashCode() for output type of groupBy function.
+ */
+class GroupByPartitioner[T, Group](fn: GroupByFn[T, Group])
+ extends UnicastPartitioner {
+ override def getPartition(message: Message, partitionNum: Int, currentPartitionId: Int): Int = {
+ val hashCode = fn.groupBy(message).hashCode()
+ (hashCode & Integer.MAX_VALUE) % partitionNum
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala
deleted file mode 100644
index b2e2932..0000000
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.dsl.partitioner
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.partitioner.UnicastPartitioner
-
-/**
- * Partition messages by applying group by function first.
- *
- * For example:
- * {{{
- * case class People(name: String, gender: String)
- *
- * object Test{
- *
- * val groupBy: (People => String) = people => people.gender
- * val partitioner = GroupByPartitioner(groupBy)
- * }
- * }}}
- *
- * @param groupBy First apply message with groupBy function, then pick the hashCode of the output
- * to do the partitioning. You must define hashCode() for output type of groupBy function.
- */
-class GroupByPartitioner[T, GROUP](groupBy: T => GROUP = null) extends UnicastPartitioner {
- override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
- val hashCode = groupBy(msg.msg.asInstanceOf[T]).hashCode()
- (hashCode & Integer.MAX_VALUE) % partitionNum
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
new file mode 100644
index 0000000..744976b
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.plan
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.Processor.DefaultProcessor
+import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
+import org.apache.gearpump.streaming.{Constants, Processor}
+import org.apache.gearpump.streaming.dsl.task.TransformTask
+import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
+import org.apache.gearpump.streaming.sink.{DataSink, DataSinkProcessor}
+import org.apache.gearpump.streaming.source.{DataSource, DataSourceTask}
+import org.apache.gearpump.streaming.task.Task
+
+import scala.reflect.ClassTag
+
+/**
+ * This is a vertex on the logical plan.
+ */
+sealed trait Op {
+
+ def description: String
+
+ def userConfig: UserConfig
+
+ def chain(op: Op)(implicit system: ActorSystem): Op
+
+ def getProcessor(implicit system: ActorSystem): Processor[_ <: Task]
+}
+
+/**
+ * This represents a low level Processor.
+ */
+case class ProcessorOp[T <: Task](
+ processor: Class[T],
+ parallelism: Int,
+ userConfig: UserConfig,
+ description: String)
+ extends Op {
+
+ def this(
+ parallelism: Int = 1,
+ userConfig: UserConfig = UserConfig.empty,
+ description: String = "processor")(implicit classTag: ClassTag[T]) = {
+ this(classTag.runtimeClass.asInstanceOf[Class[T]], parallelism, userConfig, description)
+ }
+
+ override def chain(other: Op)(implicit system: ActorSystem): Op = {
+ throw new OpChainException(this, other)
+ }
+
+ override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+ DefaultProcessor(parallelism, description, userConfig, processor)
+ }
+}
+
+/**
+ * This represents a DataSource.
+ */
+case class DataSourceOp(
+ dataSource: DataSource,
+ parallelism: Int = 1,
+ userConfig: UserConfig = UserConfig.empty,
+ description: String = "source")
+ extends Op {
+
+ override def chain(other: Op)(implicit system: ActorSystem): Op = {
+ other match {
+ case op: ChainableOp[_, _] =>
+ DataSourceOp(dataSource, parallelism,
+ userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.fn),
+ description)
+ case _ =>
+ throw new OpChainException(this, other)
+ }
+ }
+
+ override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+ Processor[DataSourceTask[Any, Any]](parallelism, description,
+ userConfig.withValue(GEARPUMP_STREAMING_SOURCE, dataSource))
+ }
+}
+
+/**
+ * This represents a DataSink.
+ */
+case class DataSinkOp(
+ dataSink: DataSink,
+ parallelism: Int = 1,
+ userConfig: UserConfig = UserConfig.empty,
+ description: String = "sink")
+ extends Op {
+
+ override def chain(op: Op)(implicit system: ActorSystem): Op = {
+ throw new OpChainException(this, op)
+ }
+
+ override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+ DataSinkProcessor(dataSink, parallelism, description)
+ }
+}
+
+/**
+ * This represents operations that can be chained together
+ * (e.g. flatMap, map, filter, reduce) and further chained
+ * to another Op to be used
+ */
+case class ChainableOp[IN, OUT](
+ fn: SingleInputFunction[IN, OUT]) extends Op {
+
+ override def description: String = fn.description
+
+ override def userConfig: UserConfig = UserConfig.empty
+
+ override def chain(other: Op)(implicit system: ActorSystem): Op = {
+ other match {
+ case op: ChainableOp[OUT, _] =>
+ // TODO: preserve type info
+ ChainableOp(fn.andThen(op.fn))
+ case _ =>
+ throw new OpChainException(this, other)
+ }
+ }
+
+ override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+ throw new UnsupportedOperationException("ChainedOp cannot be translated to Processor")
+ }
+}
+
+/**
+ * This represents a Processor with window aggregation
+ */
+case class GroupByOp[IN, GROUP](
+ groupByFn: GroupByFn[IN, GROUP],
+ parallelism: Int = 1,
+ description: String = "groupBy",
+ override val userConfig: UserConfig = UserConfig.empty)
+ extends Op {
+
+ override def chain(other: Op)(implicit system: ActorSystem): Op = {
+ other match {
+ case op: ChainableOp[_, _] =>
+ GroupByOp(groupByFn, parallelism, description,
+ userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.fn))
+ case _ =>
+ throw new OpChainException(this, other)
+ }
+ }
+
+ override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+ groupByFn.getProcessor(parallelism, description, userConfig)
+ }
+}
+
+/**
+ * This represents a Processor transforming merged streams
+ */
+case class MergeOp(description: String, userConfig: UserConfig = UserConfig.empty)
+ extends Op {
+
+ override def chain(other: Op)(implicit system: ActorSystem): Op = {
+ other match {
+ case op: ChainableOp[_, _] =>
+ MergeOp(description, userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.fn))
+ case _ =>
+ throw new OpChainException(this, other)
+ }
+ }
+
+ override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+ Processor[TransformTask[Any, Any]](1, description, userConfig)
+ }
+
+}
+
+/**
+ * This is an edge on the logical plan.
+ */
+trait OpEdge
+
+/**
+ * The upstream OP and downstream OP doesn't require network data shuffle.
+ * e.g. ChainableOp
+ */
+case object Direct extends OpEdge
+
+/**
+ * The upstream OP and downstream OP DOES require network data shuffle.
+ * e.g. GroupByOp
+ */
+case object Shuffle extends OpEdge
+
+/**
+ * Runtime exception thrown on chaining.
+ */
+class OpChainException(op1: Op, op2: Op) extends RuntimeException(s"$op1 cannot be chained by $op2")
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
deleted file mode 100644
index 8de291c..0000000
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.dsl.plan
-
-import scala.collection.TraversableOnce
-import akka.actor.ActorSystem
-import org.slf4j.Logger
-import org.apache.gearpump._
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.Processor
-import org.apache.gearpump.streaming.Processor.DefaultProcessor
-import org.apache.gearpump.streaming.dsl.op._
-import org.apache.gearpump.streaming.dsl.plan.OpTranslator._
-import org.apache.gearpump.streaming.sink.DataSinkProcessor
-import org.apache.gearpump.streaming.source.DataSourceTask
-import org.apache.gearpump.streaming.task.{Task, TaskContext}
-import org.apache.gearpump.util.LogUtil
-
-/**
- * Translates a OP to a TaskDescription
- */
-class OpTranslator extends java.io.Serializable {
- val LOG: Logger = LogUtil.getLogger(getClass)
-
- def translate(ops: OpChain)(implicit system: ActorSystem): Processor[_ <: Task] = {
-
- val baseConfig = ops.conf
-
- ops.ops.head match {
- case op: MasterOp =>
- val tail = ops.ops.tail
- val func = toFunction(tail)
- val userConfig = baseConfig.withValue(GEARPUMP_STREAMING_OPERATOR, func)
-
- op match {
- case DataSourceOp(dataSource, parallelism, conf, description) =>
- Processor[DataSourceTask[Any, Any]](parallelism,
- description = description + "." + func.description,
- userConfig.withValue(GEARPUMP_STREAMING_SOURCE, dataSource))
- case groupby@GroupByOp(_, parallelism, description, _) =>
- Processor[GroupByTask[Object, Object, Object]](parallelism,
- description = description + "." + func.description,
- userConfig.withValue(GEARPUMP_STREAMING_GROUPBY_FUNCTION, groupby))
- case merge: MergeOp =>
- Processor[TransformTask[Object, Object]](1,
- description = op.description + "." + func.description,
- userConfig)
- case ProcessorOp(processor, parallelism, conf, description) =>
- DefaultProcessor(parallelism,
- description = description + "." + func.description,
- userConfig, processor)
- case DataSinkOp(dataSink, parallelism, conf, description) =>
- DataSinkProcessor(dataSink, parallelism, description + func.description)
- }
- case op: SlaveOp[_] =>
- val func = toFunction(ops.ops)
- val userConfig = baseConfig.withValue(GEARPUMP_STREAMING_OPERATOR, func)
-
- Processor[TransformTask[Object, Object]](1,
- description = func.description,
- taskConf = userConfig)
- case chain: OpChain =>
- throw new RuntimeException("Not supposed to be called!")
- }
- }
-
- private def toFunction(ops: List[Op]): SingleInputFunction[Object, Object] = {
- val func: SingleInputFunction[Object, Object] = new DummyInputFunction[Object]()
- val totalFunction = ops.foldLeft(func) { (fun, op) =>
-
- val opFunction = op match {
- case flatmap: FlatMapOp[Object @unchecked, Object @unchecked] =>
- new FlatMapFunction(flatmap.fun, flatmap.description)
- case reduce: ReduceOp[Object @unchecked] =>
- new ReduceFunction(reduce.fun, reduce.description)
- case _ =>
- throw new RuntimeException("Not supposed to be called!")
- }
- fun.andThen(opFunction.asInstanceOf[SingleInputFunction[Object, Object]])
- }
- totalFunction.asInstanceOf[SingleInputFunction[Object, Object]]
- }
-}
-
-object OpTranslator {
-
- trait SingleInputFunction[IN, OUT] extends Serializable {
- def process(value: IN): TraversableOnce[OUT]
- def andThen[OUTER](other: SingleInputFunction[OUT, OUTER]): SingleInputFunction[IN, OUTER] = {
- new AndThen(this, other)
- }
-
- def description: String
- }
-
- class DummyInputFunction[T] extends SingleInputFunction[T, T] {
- override def andThen[OUTER](other: SingleInputFunction[T, OUTER])
- : SingleInputFunction[T, OUTER] = {
- other
- }
-
- // Should never be called
- override def process(value: T): TraversableOnce[T] = None
-
- override def description: String = ""
- }
-
- class AndThen[IN, MIDDLE, OUT](
- first: SingleInputFunction[IN, MIDDLE], second: SingleInputFunction[MIDDLE, OUT])
- extends SingleInputFunction[IN, OUT] {
-
- override def process(value: IN): TraversableOnce[OUT] = {
- first.process(value).flatMap(second.process)
- }
-
- override def description: String = {
- Option(first.description).flatMap { description =>
- Option(second.description).map(description + "." + _)
- }.orNull
- }
- }
-
- class FlatMapFunction[IN, OUT](fun: IN => TraversableOnce[OUT], descriptionMessage: String)
- extends SingleInputFunction[IN, OUT] {
-
- override def process(value: IN): TraversableOnce[OUT] = {
- fun(value)
- }
-
- override def description: String = {
- this.descriptionMessage
- }
- }
-
- class ReduceFunction[T](fun: (T, T) => T, descriptionMessage: String)
- extends SingleInputFunction[T, T] {
-
- private var state: Any = _
-
- override def process(value: T): TraversableOnce[T] = {
- if (state == null) {
- state = value
- } else {
- state = fun(state.asInstanceOf[T], value)
- }
- Some(state.asInstanceOf[T])
- }
-
- override def description: String = descriptionMessage
- }
-
- class GroupByTask[IN, GROUP, OUT](
- groupBy: IN => GROUP, taskContext: TaskContext, userConf: UserConfig)
- extends Task(taskContext, userConf) {
-
- def this(taskContext: TaskContext, userConf: UserConfig) = {
- this(userConf.getValue[GroupByOp[IN, GROUP]](
- GEARPUMP_STREAMING_GROUPBY_FUNCTION )(taskContext.system).get.fun,
- taskContext, userConf)
- }
-
- private var groups = Map.empty[GROUP, SingleInputFunction[IN, OUT]]
-
- override def onNext(msg: Message): Unit = {
- val time = msg.timestamp
-
- val group = groupBy(msg.msg.asInstanceOf[IN])
- if (!groups.contains(group)) {
- val operator =
- userConf.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get
- groups += group -> operator
- }
-
- val operator = groups(group)
-
- operator.process(msg.msg.asInstanceOf[IN]).foreach { msg =>
- taskContext.output(new Message(msg.asInstanceOf[AnyRef], time))
- }
- }
- }
-
- class TransformTask[IN, OUT](
- operator: Option[SingleInputFunction[IN, OUT]], taskContext: TaskContext,
- userConf: UserConfig) extends Task(taskContext, userConf) {
-
- def this(taskContext: TaskContext, userConf: UserConfig) = {
- this(userConf.getValue[SingleInputFunction[IN, OUT]](
- GEARPUMP_STREAMING_OPERATOR)(taskContext.system), taskContext, userConf)
- }
-
- override def onNext(msg: Message): Unit = {
- val time = msg.timestamp
-
- operator match {
- case Some(op) =>
- op.process(msg.msg.asInstanceOf[IN]).foreach { msg =>
- taskContext.output(new Message(msg.asInstanceOf[AnyRef], time))
- }
- case None =>
- taskContext.output(new Message(msg.msg, time))
- }
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
index f5bbd65..16d5c06 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
@@ -22,7 +22,6 @@ import akka.actor.ActorSystem
import org.apache.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner, Partitioner}
import org.apache.gearpump.streaming.Processor
-import org.apache.gearpump.streaming.dsl.op._
import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
import org.apache.gearpump.streaming.task.Task
import org.apache.gearpump.util.Graph
@@ -33,64 +32,60 @@ class Planner {
* Converts Dag of Op to Dag of TaskDescription. TaskDescription is part of the low
* level Graph API.
*/
- def plan(dag: Graph[Op, OpEdge])(implicit system: ActorSystem)
- : Graph[Processor[_ <: Task], _ <: Partitioner] = {
+ def plan(dag: Graph[Op, OpEdge])
+ (implicit system: ActorSystem): Graph[Processor[_ <: Task], _ <: Partitioner] = {
- val opTranslator = new OpTranslator()
-
- val newDag = optimize(dag)
- newDag.mapEdge { (node1, edge, node2) =>
+ val graph = optimize(dag)
+ graph.mapEdge { (node1, edge, node2) =>
edge match {
case Shuffle =>
- node2.head match {
- case groupBy: GroupByOp[Any @unchecked, Any @unchecked] =>
- new GroupByPartitioner(groupBy.fun)
+ node2 match {
+ case groupBy: GroupByOp[_, _] =>
+ new GroupByPartitioner(groupBy.groupByFn)
case _ => new HashPartitioner
}
case Direct =>
new CoLocationPartitioner
}
- }.mapVertex { opChain =>
- opTranslator.translate(opChain)
- }
+ }.mapVertex(_.getProcessor)
}
- private def optimize(dag: Graph[Op, OpEdge]): Graph[OpChain, OpEdge] = {
- val newGraph = dag.mapVertex(op => OpChain(List(op)))
-
- val nodes = newGraph.topologicalOrderWithCirclesIterator.toList.reverse
+ private def optimize(dag: Graph[Op, OpEdge])
+ (implicit system: ActorSystem): Graph[Op, OpEdge] = {
+ val graph = dag.copy
+ val nodes = graph.topologicalOrderWithCirclesIterator.toList.reverse
for (node <- nodes) {
- val outGoingEdges = newGraph.outgoingEdgesOf(node)
+ val outGoingEdges = graph.outgoingEdgesOf(node)
for (edge <- outGoingEdges) {
- merge(newGraph, edge._1, edge._3)
+ merge(graph, edge._1, edge._3)
}
}
- newGraph
+ graph
}
- private def merge(dag: Graph[OpChain, OpEdge], node1: OpChain, node2: OpChain)
- : Graph[OpChain, OpEdge] = {
- if (dag.outDegreeOf(node1) == 1 &&
- dag.inDegreeOf(node2) == 1 &&
+ private def merge(graph: Graph[Op, OpEdge], node1: Op, node2: Op)
+ (implicit system: ActorSystem): Unit = {
+ if (graph.outDegreeOf(node1) == 1 &&
+ graph.inDegreeOf(node2) == 1 &&
// For processor node, we don't allow it to merge with downstream operators
- !node1.head.isInstanceOf[ProcessorOp[_ <: Task]]) {
- val (_, edge, _) = dag.outgoingEdgesOf(node1).head
+ !node1.isInstanceOf[ProcessorOp[_ <: Task]] &&
+ !node2.isInstanceOf[ProcessorOp[_ <: Task]]) {
+ val (_, edge, _) = graph.outgoingEdgesOf(node1).head
if (edge == Direct) {
- val opList = OpChain(node1.ops ++ node2.ops)
- dag.addVertex(opList)
- for (incomingEdge <- dag.incomingEdgesOf(node1)) {
- dag.addEdge(incomingEdge._1, incomingEdge._2, opList)
+ val chainedOp = node1.chain(node2)
+ graph.addVertex(chainedOp)
+ for (incomingEdge <- graph.incomingEdgesOf(node1)) {
+ graph.addEdge(incomingEdge._1, incomingEdge._2, chainedOp)
}
- for (outgoingEdge <- dag.outgoingEdgesOf(node2)) {
- dag.addEdge(opList, outgoingEdge._2, outgoingEdge._3)
+ for (outgoingEdge <- graph.outgoingEdgesOf(node2)) {
+ graph.addEdge(chainedOp, outgoingEdge._2, outgoingEdge._3)
}
// Remove the old vertex
- dag.removeVertex(node1)
- dag.removeVertex(node2)
+ graph.removeVertex(node1)
+ graph.removeVertex(node2)
}
}
- dag
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
new file mode 100644
index 0000000..609fbb0
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.plan.functions
+
+trait SingleInputFunction[IN, OUT] extends Serializable {
+ def process(value: IN): TraversableOnce[OUT]
+ def andThen[OUTER](other: SingleInputFunction[OUT, OUTER]): SingleInputFunction[IN, OUTER] = {
+ new AndThen(this, other)
+ }
+ def finish(): TraversableOnce[OUT] = None
+ def clearState(): Unit = {}
+ def description: String
+}
+
+class AndThen[IN, MIDDLE, OUT](
+ first: SingleInputFunction[IN, MIDDLE], second: SingleInputFunction[MIDDLE, OUT])
+ extends SingleInputFunction[IN, OUT] {
+
+ override def process(value: IN): TraversableOnce[OUT] = {
+ first.process(value).flatMap(second.process)
+ }
+
+ override def finish(): TraversableOnce[OUT] = {
+ val firstResult = first.finish().flatMap(second.process)
+ if (firstResult.isEmpty) {
+ second.finish()
+ } else {
+ firstResult
+ }
+ }
+
+ override def clearState(): Unit = {
+ first.clearState()
+ second.clearState()
+ }
+
+ override def description: String = {
+ Option(first.description).flatMap { description =>
+ Option(second.description).map(description + "." + _)
+ }.orNull
+ }
+}
+
+class FlatMapFunction[IN, OUT](fn: IN => TraversableOnce[OUT], descriptionMessage: String)
+ extends SingleInputFunction[IN, OUT] {
+
+ override def process(value: IN): TraversableOnce[OUT] = {
+ fn(value)
+ }
+
+ override def description: String = descriptionMessage
+}
+
+
+class ReduceFunction[T](fn: (T, T) => T, descriptionMessage: String)
+ extends SingleInputFunction[T, T] {
+
+ private var state: Option[T] = None
+
+ override def process(value: T): TraversableOnce[T] = {
+ if (state.isEmpty) {
+ state = Option(value)
+ } else {
+ state = state.map(fn(_, value))
+ }
+ None
+ }
+
+ override def finish(): TraversableOnce[T] = {
+ state
+ }
+
+ override def clearState(): Unit = {
+ state = None
+ }
+
+ override def description: String = descriptionMessage
+}
+
+class EmitFunction[T](emit: T => Unit) extends SingleInputFunction[T, Unit] {
+
+ override def process(value: T): TraversableOnce[Unit] = {
+ emit(value)
+ None
+ }
+
+ override def andThen[R](other: SingleInputFunction[Unit, R]): SingleInputFunction[T, R] = {
+ throw new UnsupportedOperationException("andThen is not supposed to be called on EmitFunction")
+ }
+
+ override def description: String = ""
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
new file mode 100644
index 0000000..4ee2fa8
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.task
+
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.dsl.window.api.CountWindowFn
+import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, GroupAlsoByWindow, WindowRunner}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+
+/**
+ * This task triggers output on number of messages in a window.
+ */
+class CountTriggerTask[IN, GROUP](
+ groupBy: GroupAlsoByWindow[IN, GROUP],
+ windowRunner: WindowRunner,
+ taskContext: TaskContext,
+ userConfig: UserConfig)
+ extends Task(taskContext, userConfig) {
+
+ def this(groupBy: GroupAlsoByWindow[IN, GROUP],
+ taskContext: TaskContext, userConfig: UserConfig) = {
+ this(groupBy, new DefaultWindowRunner(taskContext, userConfig, groupBy)(taskContext.system),
+ taskContext, userConfig)
+ }
+
+ def this(taskContext: TaskContext, userConfig: UserConfig) = {
+ this(userConfig.getValue[GroupAlsoByWindow[IN, GROUP]](
+ GEARPUMP_STREAMING_GROUPBY_FUNCTION)(taskContext.system).get,
+ taskContext, userConfig)
+ }
+
+ private val windowSize = groupBy.window.windowFn.asInstanceOf[CountWindowFn].size
+ private var num = 0
+
+ override def onNext(msg: Message): Unit = {
+ windowRunner.process(msg)
+ num += 1
+ if (windowSize == num) {
+ windowRunner.trigger(Instant.ofEpochMilli(windowSize))
+ num = 0
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala
new file mode 100644
index 0000000..4b7649f
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.task
+
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, GroupAlsoByWindow, WindowRunner}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+
+/**
+ * This task triggers output on watermark progress.
+ */
+class EventTimeTriggerTask[IN, GROUP](
+ groupBy: GroupAlsoByWindow[IN, GROUP],
+ windowRunner: WindowRunner,
+ taskContext: TaskContext,
+ userConfig: UserConfig)
+ extends Task(taskContext, userConfig) {
+
+ def this(groupBy: GroupAlsoByWindow[IN, GROUP],
+ taskContext: TaskContext, userConfig: UserConfig) = {
+ this(groupBy, new DefaultWindowRunner(taskContext, userConfig, groupBy)(taskContext.system),
+ taskContext, userConfig)
+ }
+
+ def this(taskContext: TaskContext, userConfig: UserConfig) = {
+ this(userConfig.getValue[GroupAlsoByWindow[IN, GROUP]](
+ GEARPUMP_STREAMING_GROUPBY_FUNCTION)(taskContext.system).get,
+ taskContext, userConfig)
+ }
+
+ override def onNext(message: Message): Unit = {
+ windowRunner.process(message)
+ }
+
+ override def onWatermarkProgress(watermark: Instant): Unit = {
+ windowRunner.trigger(watermark)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
new file mode 100644
index 0000000..980a54b
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.task
+
+import java.time.Instant
+import java.util.concurrent.TimeUnit
+
+import akka.actor.Actor.Receive
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.dsl.task.ProcessingTimeTriggerTask.Triggering
+import org.apache.gearpump.streaming.dsl.window.api.SlidingWindowFn
+import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, GroupAlsoByWindow, WindowRunner}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+
+import scala.concurrent.duration.FiniteDuration
+
+object ProcessingTimeTriggerTask {
+ case object Triggering
+}
+
+/**
+ * This task triggers output on scheduled system time interval.
+ */
+class ProcessingTimeTriggerTask[IN, GROUP](
+ groupBy: GroupAlsoByWindow[IN, GROUP],
+ windowRunner: WindowRunner,
+ taskContext: TaskContext,
+ userConfig: UserConfig)
+ extends Task(taskContext, userConfig) {
+
+ def this(groupBy: GroupAlsoByWindow[IN, GROUP],
+ taskContext: TaskContext, userConfig: UserConfig) = {
+ this(groupBy, new DefaultWindowRunner(taskContext, userConfig, groupBy)(taskContext.system),
+ taskContext, userConfig)
+ }
+
+ def this(taskContext: TaskContext, userConfig: UserConfig) = {
+ this(userConfig.getValue[GroupAlsoByWindow[IN, GROUP]](
+ GEARPUMP_STREAMING_GROUPBY_FUNCTION)(taskContext.system).get,
+ taskContext, userConfig)
+ }
+
+ private val windowFn = groupBy.window.windowFn.asInstanceOf[SlidingWindowFn]
+ private val windowSizeMs = windowFn.size.toMillis
+ private val windowStepMs = windowFn.step.toMillis
+
+ override def onStart(startTime: Instant): Unit = {
+ val initialDelay = windowSizeMs - Instant.now.toEpochMilli % windowSizeMs
+ taskContext.scheduleOnce(
+ new FiniteDuration(initialDelay, TimeUnit.MILLISECONDS))(self ! Triggering)
+ }
+
+ override def onNext(message: Message): Unit = {
+ windowRunner.process(message)
+ }
+
+ override def receiveUnManagedMessage: Receive = {
+ case Triggering =>
+ windowRunner.trigger(Instant.now)
+ taskContext.scheduleOnce(
+ new FiniteDuration(windowStepMs, TimeUnit.MILLISECONDS))(self ! Triggering)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
new file mode 100644
index 0000000..e35f085
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.task
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+
+class TransformTask[IN, OUT](
+ operator: Option[SingleInputFunction[IN, OUT]], taskContext: TaskContext,
+ userConf: UserConfig) extends Task(taskContext, userConf) {
+
+ def this(taskContext: TaskContext, userConf: UserConfig) = {
+ this(userConf.getValue[SingleInputFunction[IN, OUT]](
+ GEARPUMP_STREAMING_OPERATOR)(taskContext.system), taskContext, userConf)
+ }
+
+ override def onNext(msg: Message): Unit = {
+ val time = msg.timestamp
+
+ operator match {
+ case Some(op) =>
+ op.process(msg.msg.asInstanceOf[IN]).foreach { msg =>
+ taskContext.output(new Message(msg, time))
+ }
+ case None =>
+ taskContext.output(new Message(msg.msg, time))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala
new file mode 100644
index 0000000..a4524a8
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.window.api
+
+sealed trait AccumulationMode
+
+case object Accumulating extends AccumulationMode
+
+case object Discarding extends AccumulationMode
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/GroupByFn.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/GroupByFn.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/GroupByFn.scala
new file mode 100644
index 0000000..30e68ba
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/GroupByFn.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.window.api
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Processor
+import org.apache.gearpump.streaming.task.Task
+
+/**
+ * Divides messages into groups according its payload and timestamp.
+ * Check [[org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow]]
+ * for default implementation.
+ */
+trait GroupByFn[T, GROUP] {
+
+ /**
+ * Used by
+ * 1. GroupByPartitioner to shuffle messages
+ * 2. WindowRunner to group messages for time-based aggregation
+ */
+ def groupBy(message: Message): GROUP
+
+ /**
+ * Returns a Processor according to window trigger during planning
+ */
+ def getProcessor(parallelism: Int, description: String,
+ userConfig: UserConfig)(implicit system: ActorSystem): Processor[_ <: Task]
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala
new file mode 100644
index 0000000..9865e18
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.window.api
+
+sealed trait Trigger
+
+case object EventTimeTrigger extends Trigger
+
+case object ProcessingTimeTrigger extends Trigger
+
+case object CountTrigger extends Trigger
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Window.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Window.scala
new file mode 100644
index 0000000..4b94879
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Window.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.window.api
+
+import java.time.Duration
+
+/**
+ *
+ * @param windowFn
+ * @param trigger
+ * @param accumulationMode
+ */
+case class Window(
+ windowFn: WindowFn,
+ trigger: Trigger = EventTimeTrigger,
+ accumulationMode: AccumulationMode = Discarding) {
+
+ def triggering(trigger: Trigger): Window = {
+ Window(windowFn, trigger)
+ }
+
+ def accumulating: Window = {
+ Window(windowFn, trigger, Accumulating)
+ }
+
+ def discarding: Window = {
+ Window(windowFn, trigger, Discarding)
+ }
+}
+
+object CountWindow {
+
+ def apply(size: Int): Window = {
+ Window(CountWindowFn(size), CountTrigger)
+ }
+}
+
+object FixedWindow {
+
+ /**
+ * Defines a FixedWindow.
+ * @param size window size
+ * @return a Window definition
+ */
+ def apply(size: Duration): Window = {
+ Window(SlidingWindowFn(size, size))
+ }
+}
+
+object SlidingWindow {
+
+ /**
+ * Defines a SlidingWindow
+ * @param size window size
+ * @param step window step to slide forward
+ * @return a Window definition
+ */
+ def apply(size: Duration, step: Duration): Window = {
+ Window(SlidingWindowFn(size, step))
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFn.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFn.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFn.scala
new file mode 100644
index 0000000..0768730
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFn.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.window.api
+
+import java.time.{Duration, Instant}
+
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.streaming.dsl.window.impl.Bucket
+
+import scala.collection.mutable.ArrayBuffer
+
+sealed trait WindowFn {
+ def apply(timestamp: Instant): List[Bucket]
+}
+
+case class SlidingWindowFn(size: Duration, step: Duration)
+ extends WindowFn {
+
+ def this(size: Duration) = {
+ this(size, size)
+ }
+
+ override def apply(timestamp: Instant): List[Bucket] = {
+ val sizeMillis = size.toMillis
+ val stepMillis = step.toMillis
+ val timeMillis = timestamp.toEpochMilli
+ val windows = ArrayBuffer.empty[Bucket]
+ var start = lastStartFor(timeMillis, stepMillis)
+ windows += Bucket.ofEpochMilli(start, start + sizeMillis)
+ start -= stepMillis
+ while (start >= timeMillis) {
+ windows += Bucket.ofEpochMilli(start, start + sizeMillis)
+ start -= stepMillis
+ }
+ windows.toList
+ }
+
+ private def lastStartFor(timestamp: TimeStamp, windowStep: Long): TimeStamp = {
+ timestamp - (timestamp + windowStep) % windowStep
+ }
+}
+
+case class CountWindowFn(size: Int) extends WindowFn {
+
+ override def apply(timestamp: Instant): List[Bucket] = {
+ List(Bucket.ofEpochMilli(0, size))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala
new file mode 100644
index 0000000..e978983
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.window.impl
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.streaming.dsl.window.api.Trigger
+
+trait ReduceFnRunner {
+
+ def process(message: Message): Unit
+
+ def onTrigger(trigger: Trigger): Unit
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
new file mode 100644
index 0000000..53cf5d0
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.window.impl
+
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.Processor
+import org.apache.gearpump.{Message, TimeStamp}
+import org.apache.gearpump.streaming.dsl.window.api._
+import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, EventTimeTriggerTask, ProcessingTimeTriggerTask}
+import org.apache.gearpump.streaming.task.Task
+
+object Bucket {
+ def ofEpochMilli(startTime: TimeStamp, endTime: TimeStamp): Bucket = {
+ Bucket(Instant.ofEpochMilli(startTime), Instant.ofEpochMilli(endTime))
+ }
+}
+
+/**
+ * A window unit including startTime and excluding endTime.
+ */
+case class Bucket(startTime: Instant, endTime: Instant) extends Comparable[Bucket] {
+ override def compareTo(o: Bucket): Int = {
+ val ret = startTime.compareTo(o.startTime)
+ if (ret != 0) {
+ ret
+ } else {
+ endTime.compareTo(o.endTime)
+ }
+ }
+}
+
+case class GroupAlsoByWindow[T, GROUP](groupByFn: T => GROUP, window: Window)
+ extends GroupByFn[T, (GROUP, List[Bucket])] {
+
+ override def groupBy(message: Message): (GROUP, List[Bucket]) = {
+ val group = groupByFn(message.msg.asInstanceOf[T])
+ val buckets = window.windowFn(Instant.ofEpochMilli(message.timestamp))
+ group -> buckets
+ }
+
+ override def getProcessor(parallelism: Int, description: String,
+ userConfig: UserConfig)(implicit system: ActorSystem): Processor[_ <: Task] = {
+ val config = userConfig.withValue(GEARPUMP_STREAMING_GROUPBY_FUNCTION, this)
+ window.trigger match {
+ case CountTrigger =>
+ Processor[CountTriggerTask[T, GROUP]](parallelism, description, config)
+ case ProcessingTimeTrigger =>
+ Processor[ProcessingTimeTriggerTask[T, GROUP]](parallelism, description, config)
+ case EventTimeTrigger =>
+ Processor[EventTimeTriggerTask[T, GROUP]](parallelism, description, config)
+ }
+ }
+
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
new file mode 100644
index 0000000..9af5e61
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.window.impl
+
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.gs.collections.api.block.procedure.Procedure
+import org.apache.gearpump.gs.collections.impl.list.mutable.FastList
+import org.apache.gearpump.gs.collections.impl.map.mutable.UnifiedMap
+import org.apache.gearpump.gs.collections.impl.map.sorted.mutable.TreeSortedMap
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.dsl.plan.functions.{EmitFunction, SingleInputFunction}
+import org.apache.gearpump.streaming.dsl.window.api.Discarding
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.util.LogUtil
+import org.slf4j.Logger
+
+trait WindowRunner {
+
+ def process(message: Message): Unit
+
+ def trigger(time: Instant): Unit
+
+}
+
+object DefaultWindowRunner {
+
+ private val LOG: Logger = LogUtil.getLogger(classOf[DefaultWindowRunner[_, _, _]])
+
+ case class WindowGroup[GROUP](bucket: Bucket, group: GROUP)
+ extends Comparable[WindowGroup[GROUP]] {
+ override def compareTo(o: WindowGroup[GROUP]): Int = {
+ val ret = bucket.compareTo(o.bucket)
+ if (ret != 0) {
+ ret
+ } else if (group.equals(o.group)) {
+ 0
+ } else {
+ -1
+ }
+ }
+ }
+}
+
+class DefaultWindowRunner[IN, GROUP, OUT](
+ taskContext: TaskContext, userConfig: UserConfig,
+ groupBy: GroupAlsoByWindow[IN, GROUP])(implicit system: ActorSystem)
+ extends WindowRunner {
+ import org.apache.gearpump.streaming.dsl.window.impl.DefaultWindowRunner._
+
+ private val windowGroups = new TreeSortedMap[WindowGroup[GROUP], FastList[IN]]
+ private val groupFns = new UnifiedMap[GROUP, SingleInputFunction[IN, OUT]]
+
+
+ override def process(message: Message): Unit = {
+ val (group, buckets) = groupBy.groupBy(message)
+ buckets.foreach { bucket =>
+ val wg = WindowGroup(bucket, group)
+ val inputs = windowGroups.getOrDefault(wg, new FastList[IN](1))
+ inputs.add(message.msg.asInstanceOf[IN])
+ windowGroups.put(wg, inputs)
+ }
+ groupFns.putIfAbsent(group,
+ userConfig.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get)
+ }
+
+ override def trigger(time: Instant): Unit = {
+ onTrigger()
+
+ @annotation.tailrec
+ def onTrigger(): Unit = {
+ if (windowGroups.notEmpty()) {
+ val first = windowGroups.firstKey
+ if (!time.isBefore(first.bucket.endTime)) {
+ val inputs = windowGroups.remove(first)
+ val reduceFn = groupFns.get(first.group)
+ .andThen[Unit](new EmitFunction[OUT](emitResult(_, time)))
+ inputs.forEach(new Procedure[IN] {
+ override def value(t: IN): Unit = {
+ reduceFn.process(t)
+ }
+ })
+ reduceFn.finish()
+ if (groupBy.window.accumulationMode == Discarding) {
+ reduceFn.clearState()
+ }
+ onTrigger()
+ }
+ }
+ }
+
+ def emitResult(result: OUT, time: Instant): Unit = {
+ taskContext.output(Message(result, time.toEpochMilli))
+ }
+ }
+}