You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/08/14 04:07:34 UTC

incubator-gearpump git commit: [GEARPUMP-339] Add ScalaDoc to window api and impl

Repository: incubator-gearpump
Updated Branches:
  refs/heads/master e1228a314 -> c1801595d


[GEARPUMP-339] Add ScalaDoc to window api and impl

Author: manuzhang <ow...@gmail.com>

Closes #213 from manuzhang/add_window_doc.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/c1801595
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/c1801595
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/c1801595

Branch: refs/heads/master
Commit: c1801595df62eff8bd8d1b3e51e94aa130fb686f
Parents: e1228a3
Author: manuzhang <ow...@gmail.com>
Authored: Mon Aug 14 12:07:06 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Mon Aug 14 12:07:17 2017 +0800

----------------------------------------------------------------------
 .../dsl/window/api/AccumulationMode.scala       | 10 +++++++
 .../streaming/dsl/window/api/Trigger.scala      |  9 ++++++
 .../dsl/window/api/WindowFunction.scala         | 21 ++++++++++++++
 .../streaming/dsl/window/api/Windows.scala      | 18 ++++++++----
 .../dsl/window/impl/ReduceFnRunner.scala        | 29 --------------------
 .../streaming/dsl/window/impl/Window.scala      |  2 +-
 .../dsl/window/impl/WindowRunner.scala          | 25 ++++++++++++++++-
 7 files changed, 78 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c1801595/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala
index a4524a8..46b8e92 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala
@@ -17,8 +17,18 @@
  */
 package org.apache.gearpump.streaming.dsl.window.api
 
+
+/**
+ * Determines relationship between multiple results for the same window.
+ */
 sealed trait AccumulationMode
 
+/**
+ * Window results are accumulated.
+ */
 case object Accumulating extends AccumulationMode
 
+/**
+ * Window results are independent.
+ */
 case object Discarding extends AccumulationMode

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c1801595/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala
index 02d52a0..b9a8695 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala
@@ -17,7 +17,16 @@
  */
 package org.apache.gearpump.streaming.dsl.window.api
 
+/**
+ * Determines when window results are emitted.
+ * For now, [[EventTimeTrigger]] is used for all applications.
+ */
+// TODO: Make this a public API
 sealed trait Trigger
 
+/**
+ * Triggers emitting when watermark past the end of window on event time.
+ */
+// FIXME: This is no more than a tag now and the logic is hard corded in WindowRunner
 case object EventTimeTrigger extends Trigger
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c1801595/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
index 85ca969..4db02e7 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
@@ -33,8 +33,14 @@ object WindowFunction {
   }
 }
 
+/**
+ * Determines how elements are assigned to windows for calculation.
+ */
 trait WindowFunction {
 
+  /**
+   * Assigns elements into windows.
+   */
   def apply[T](context: WindowFunction.Context[T]): Array[Window]
 
   def isNonMerging: Boolean
@@ -51,6 +57,9 @@ object GlobalWindowFunction {
     Instant.ofEpochMilli(Time.MAX_TIME_MILLIS)))
 }
 
+/**
+ * All elements are assigned to the same global window for calculation.
+ */
 case class GlobalWindowFunction() extends NonMergingWindowFunction {
 
   override def apply[T](context: WindowFunction.Context[T]): Array[Window] = {
@@ -58,6 +67,12 @@ case class GlobalWindowFunction() extends NonMergingWindowFunction {
   }
 }
 
+/**
+ * Elements are assigned to non-merging sliding windows for calculation.
+ *
+ * @param size window size
+ * @param step window step to slide forward
+ */
 case class SlidingWindowFunction(size: Duration, step: Duration)
   extends NonMergingWindowFunction {
 
@@ -86,6 +101,12 @@ case class SlidingWindowFunction(size: Duration, step: Duration)
   }
 }
 
+/**
+ * Elements are assigned to merging windows for calculation. Windows are merged
+ * if their distance is within the defined gap.
+ *
+ * @param gap session gap
+ */
 case class SessionWindowFunction(gap: Duration) extends WindowFunction {
 
   override def apply[T](context: WindowFunction.Context[T]): Array[Window] = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c1801595/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
index d53bc96..e15b5c4 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
@@ -20,11 +20,14 @@ package org.apache.gearpump.streaming.dsl.window.api
 import java.time.Duration
 
 /**
- * Defines how to apply window functions.
+ * User facing Window DSL.
+ * Defines how to apply [[WindowFunction]], [[Trigger]]
+ * and [[AccumulationMode]].
  *
  * @param windowFn how to divide windows
  * @param trigger when to trigger window result
- * @param accumulationMode whether to accumulate results across windows
+ * @param accumulationMode whether to accumulate window results
+ * @param description window description
  */
 case class Windows(
     windowFn: WindowFunction,
@@ -47,6 +50,11 @@ case class Windows(
 
 object GlobalWindows {
 
+  /**
+   * Defines a [[GlobalWindowFunction]].
+   *
+   * @return a Window definition
+   */
   def apply(): Windows = {
     Windows(GlobalWindowFunction(), description = "globalWindows")
   }
@@ -55,7 +63,7 @@ object GlobalWindows {
 object FixedWindows {
 
   /**
-   * Defines a FixedWindow.
+   * Defines a non-overlapping [[SlidingWindowFunction]].
    *
    * @param size window size
    * @return a Window definition
@@ -68,7 +76,7 @@ object FixedWindows {
 object SlidingWindows {
 
   /**
-   * Defines a SlidingWindow.
+   * Defines a overlapping [[SlidingWindowFunction]].
    *
    * @param size window size
    * @param step window step to slide forward
@@ -82,7 +90,7 @@ object SlidingWindows {
 object SessionWindows {
 
   /**
-   * Defines a SessionWindow.
+   * Defines a [[SessionWindowFunction]].
    *
    * @param gap session gap
    * @return a Window definition

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c1801595/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala
deleted file mode 100644
index e978983..0000000
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.dsl.window.impl
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.streaming.dsl.window.api.Trigger
-
-trait ReduceFnRunner {
-
-  def process(message: Message): Unit
-
-  def onTrigger(trigger: Trigger): Unit
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c1801595/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
index 7536473..d6d08c9 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
@@ -28,7 +28,7 @@ object Window {
 }
 
 /**
- * A window unit including startTime and excluding endTime.
+ * A window unit from startTime(including) to endTime(excluding).
  */
 case class Window(startTime: Instant, endTime: Instant) extends Comparable[Window] {
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c1801595/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
index 17a9525..ee3c067 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
@@ -31,11 +31,23 @@ import org.apache.gearpump.streaming.task.TaskUtil
 
 import scala.collection.mutable.ArrayBuffer
 
+/**
+ * Inputs for [[WindowRunner]].
+ */
 case class TimestampedValue[T](value: T, timestamp: Instant)
 
+/**
+ * Outputs triggered by [[WindowRunner]]
+ */
 case class TriggeredOutputs[T](outputs: TraversableOnce[TimestampedValue[T]],
     watermark: Instant)
 
+/**
+ * This is responsible for executing window calculation.
+ *   1. Groups elements into windows as defined by window function
+ *   2. Applies window calculation to each group
+ *   3. Emits results on triggering
+ */
 trait WindowRunner[IN, OUT] extends java.io.Serializable {
 
   def process(timestampedValue: TimestampedValue[IN]): Unit
@@ -43,6 +55,10 @@ trait WindowRunner[IN, OUT] extends java.io.Serializable {
   def trigger(time: Instant): TriggeredOutputs[OUT]
 }
 
+/**
+ * A composite WindowRunner that first executes its left child and feeds results
+ * into result child.
+ */
 case class AndThen[IN, MIDDLE, OUT](left: WindowRunner[IN, MIDDLE],
     right: WindowRunner[MIDDLE, OUT]) extends WindowRunner[IN, OUT] {
 
@@ -57,6 +73,9 @@ case class AndThen[IN, MIDDLE, OUT](left: WindowRunner[IN, MIDDLE],
   }
 }
 
+/**
+ * Default implementation for [[WindowRunner]].
+ */
 class DefaultWindowRunner[IN, OUT](
     windows: Windows,
     fnRunner: FunctionRunner[IN, OUT])
@@ -137,11 +156,15 @@ class DefaultWindowRunner[IN, OUT](
           }
           onTrigger(outputs, newWmk)
         } else {
-          // minimum of end of last triggered window and start of first un-triggered window
+          // The output watermark is the minimum of end of last triggered window
+          // and start of first un-triggered window
           TriggeredOutputs(outputs, TaskUtil.min(wmk, firstWin.startTime))
         }
       } else {
+        // All windows have been triggered.
         if (time == Watermark.MAX) {
+          // This means there will be no more inputs
+          // so it's safe to advance to the maximum watermark.
           TriggeredOutputs(outputs, Watermark.MAX)
         } else {
           TriggeredOutputs(outputs, wmk)