You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/05/27 07:40:54 UTC
incubator-gearpump git commit: [GEARPUMP-315] Add GlobalWindows and
implement groupBy on it
Repository: incubator-gearpump
Updated Branches:
refs/heads/master fc8006cea -> c1370d9bf
[GEARPUMP-315] Add GlobalWindows and implement groupBy on it
Author: manuzhang <ow...@gmail.com>
Closes #185 from manuzhang/global_window.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/c1370d9b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/c1370d9b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/c1370d9b
Branch: refs/heads/master
Commit: c1370d9bf21b62c964d107a1f24765770116a316
Parents: fc8006c
Author: manuzhang <ow...@gmail.com>
Authored: Sat May 27 15:39:00 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Sat May 27 15:40:24 2017 +0800
----------------------------------------------------------------------
.../apache/gearpump/streaming/dsl/scalaapi/Stream.scala | 2 +-
.../streaming/dsl/window/api/WindowFunction.scala | 11 ++++++++++-
.../gearpump/streaming/dsl/window/api/Windows.scala | 7 +++++++
.../gearpump/streaming/dsl/scalaapi/StreamSpec.scala | 4 ++--
4 files changed, 20 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c1370d9b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
index 82d6beb..e15d4ae 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
@@ -188,7 +188,7 @@ class Stream[T](
*/
def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1,
description: String = "groupBy"): Stream[T] = {
- window(CountWindows.apply(1).accumulating)
+ window(GlobalWindows())
.groupBy[GROUP](fn, parallelism, description)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c1370d9b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
index 73fef5d..7da9c85 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
@@ -19,7 +19,7 @@ package org.apache.gearpump.streaming.dsl.window.api
import java.time.{Duration, Instant}
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.{MIN_TIME_MILLIS, MAX_TIME_MILLIS, TimeStamp}
import org.apache.gearpump.streaming.dsl.window.impl.Window
import scala.collection.mutable.ArrayBuffer
@@ -44,6 +44,15 @@ abstract class NonMergingWindowFunction[T] extends WindowFunction[T] {
override def isNonMerging: Boolean = true
}
+case class GlobalWindowFunction[T]() extends NonMergingWindowFunction[T] {
+
+ override def apply(context: WindowFunction.Context[T]): Array[Window] = {
+ Array(Window(Instant.ofEpochMilli(MIN_TIME_MILLIS),
+ Instant.ofEpochMilli(MAX_TIME_MILLIS)))
+ }
+
+}
+
case class SlidingWindowFunction[T](size: Duration, step: Duration)
extends NonMergingWindowFunction[T] {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c1370d9b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
index 5917f09..467f57c 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
@@ -52,6 +52,13 @@ object CountWindows {
}
}
+object GlobalWindows {
+
+ def apply[T](): Windows[T] = {
+ Windows(GlobalWindowFunction())
+ }
+}
+
object FixedWindows {
/**
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c1370d9b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
index fb398b8..4c7e209 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
@@ -24,7 +24,7 @@ import org.apache.gearpump.cluster.client.ClientContext
import org.apache.gearpump.cluster.{TestUtil, UserConfig}
import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
import org.apache.gearpump.streaming.dsl.scalaapi.StreamSpec.Join
-import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask}
+import org.apache.gearpump.streaming.dsl.task.{EventTimeTriggerTask, TransformTask}
import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, HashPartitioner, PartitionerDescription}
import org.apache.gearpump.streaming.source.DataSourceTask
import org.apache.gearpump.streaming.task.{Task, TaskContext}
@@ -92,7 +92,7 @@ class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Mock
private def getExpectedDagTopology: Graph[String, String] = {
val source = classOf[DataSourceTask[_, _]].getName
- val group = classOf[CountTriggerTask[_, _]].getName
+ val group = classOf[EventTimeTriggerTask[_, _]].getName
val merge = classOf[TransformTask[_, _]].getName
val join = classOf[Join].getName