You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/05/27 07:40:54 UTC

incubator-gearpump git commit: [GEARPUMP-315] Add GlobalWindows and implement groupBy on it

Repository: incubator-gearpump
Updated Branches:
  refs/heads/master fc8006cea -> c1370d9bf


[GEARPUMP-315] Add GlobalWindows and implement groupBy on it

Author: manuzhang <ow...@gmail.com>

Closes #185 from manuzhang/global_window.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/c1370d9b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/c1370d9b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/c1370d9b

Branch: refs/heads/master
Commit: c1370d9bf21b62c964d107a1f24765770116a316
Parents: fc8006c
Author: manuzhang <ow...@gmail.com>
Authored: Sat May 27 15:39:00 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Sat May 27 15:40:24 2017 +0800

----------------------------------------------------------------------
 .../apache/gearpump/streaming/dsl/scalaapi/Stream.scala  |  2 +-
 .../streaming/dsl/window/api/WindowFunction.scala        | 11 ++++++++++-
 .../gearpump/streaming/dsl/window/api/Windows.scala      |  7 +++++++
 .../gearpump/streaming/dsl/scalaapi/StreamSpec.scala     |  4 ++--
 4 files changed, 20 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c1370d9b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
index 82d6beb..e15d4ae 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
@@ -188,7 +188,7 @@ class Stream[T](
    */
   def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1,
       description: String = "groupBy"): Stream[T] = {
-    window(CountWindows.apply(1).accumulating)
+    window(GlobalWindows())
       .groupBy[GROUP](fn, parallelism, description)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c1370d9b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
index 73fef5d..7da9c85 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
@@ -19,7 +19,7 @@ package org.apache.gearpump.streaming.dsl.window.api
 
 import java.time.{Duration, Instant}
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.{MIN_TIME_MILLIS, MAX_TIME_MILLIS, TimeStamp}
 import org.apache.gearpump.streaming.dsl.window.impl.Window
 
 import scala.collection.mutable.ArrayBuffer
@@ -44,6 +44,15 @@ abstract class NonMergingWindowFunction[T] extends WindowFunction[T] {
   override def isNonMerging: Boolean = true
 }
 
+case class GlobalWindowFunction[T]() extends NonMergingWindowFunction[T] {
+
+  override def apply(context: WindowFunction.Context[T]): Array[Window] = {
+    Array(Window(Instant.ofEpochMilli(MIN_TIME_MILLIS),
+      Instant.ofEpochMilli(MAX_TIME_MILLIS)))
+  }
+
+}
+
 case class SlidingWindowFunction[T](size: Duration, step: Duration)
   extends NonMergingWindowFunction[T] {
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c1370d9b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
index 5917f09..467f57c 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
@@ -52,6 +52,13 @@ object CountWindows {
   }
 }
 
+object GlobalWindows {
+
+  def apply[T](): Windows[T] = {
+    Windows(GlobalWindowFunction())
+  }
+}
+
 object FixedWindows {
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c1370d9b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
index fb398b8..4c7e209 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
@@ -24,7 +24,7 @@ import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.{TestUtil, UserConfig}
 import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
 import org.apache.gearpump.streaming.dsl.scalaapi.StreamSpec.Join
-import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask}
+import org.apache.gearpump.streaming.dsl.task.{EventTimeTriggerTask, TransformTask}
 import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, HashPartitioner, PartitionerDescription}
 import org.apache.gearpump.streaming.source.DataSourceTask
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
@@ -92,7 +92,7 @@ class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Mock
 
   private def getExpectedDagTopology: Graph[String, String] = {
     val source = classOf[DataSourceTask[_, _]].getName
-    val group = classOf[CountTriggerTask[_, _]].getName
+    val group = classOf[EventTimeTriggerTask[_, _]].getName
     val merge = classOf[TransformTask[_, _]].getName
     val join = classOf[Join].getName