You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/03/02 05:54:55 UTC

incubator-gearpump git commit: [GEARPUMP-287] Trigger data process on watermark

Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 6a65dcbd7 -> 07d8d51e7


[GEARPUMP-287] Trigger data process on watermark

Author: manuzhang <ow...@gmail.com>

Closes #165 from manuzhang/GEARPUMP-287.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/07d8d51e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/07d8d51e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/07d8d51e

Branch: refs/heads/master
Commit: 07d8d51e7f0ce39564aeb919a61cee35845deacc
Parents: 6a65dcb
Author: manuzhang <ow...@gmail.com>
Authored: Thu Mar 2 13:54:33 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Thu Mar 2 13:54:38 2017 +0800

----------------------------------------------------------------------
 .../dsl/plan/functions/FunctionRunner.scala     |  8 +++
 .../streaming/dsl/task/TransformTask.scala      | 70 +++++++++++++++-----
 .../dsl/window/impl/WindowRunner.scala          | 10 +--
 .../streaming/source/DataSourceTask.scala       | 28 +++-----
 .../dsl/plan/functions/FunctionRunnerSpec.scala | 12 +++-
 .../streaming/dsl/task/TransformTaskSpec.scala  | 21 ++++--
 .../streaming/source/DataSourceTaskSpec.scala   | 20 +++---
 7 files changed, 112 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/07d8d51e/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
index 36821e4..9dfa6ad 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
@@ -20,6 +20,13 @@ package org.apache.gearpump.streaming.dsl.plan.functions
 import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction
 import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
 
+object FunctionRunner {
+  def withEmitFn[IN, OUT](runner: FunctionRunner[IN, OUT],
+      fn: OUT => Unit): FunctionRunner[IN, Unit] = {
+    AndThen(runner, new Emit(fn))
+  }
+}
+
 /**
  * Interface to invoke SerializableFunction methods
  *
@@ -126,3 +133,4 @@ class Emit[T](emit: T => Unit) extends FunctionRunner[T, Unit] {
 
   override def description: String = ""
 }
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/07d8d51e/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
index c4278dd..ed48dc7 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
@@ -23,34 +23,74 @@ import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.Constants._
 import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
+import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
-class TransformTask[IN, OUT](operator: Option[FunctionRunner[IN, OUT]],
+object TransformTask {
+
+  class Transform[IN, OUT](taskContext: TaskContext,
+      operator: Option[FunctionRunner[IN, OUT]],
+      private var buffer: Vector[Message] = Vector.empty[Message]) {
+
+    def onStart(startTime: Instant): Unit = {
+      operator.foreach(_.setup())
+    }
+
+    def onNext(msg: Message): Unit = {
+      buffer +:= msg
+    }
+
+    def onStop(): Unit = {
+      operator.foreach(_.teardown())
+    }
+
+    def onWatermarkProgress(watermark: Instant): Unit = {
+      val watermarkTime = watermark.toEpochMilli
+      var nextBuffer = Vector.empty[Message]
+      val processor = operator.map(FunctionRunner.withEmitFn(_,
+        (out: OUT) => taskContext.output(Message(out, watermarkTime))))
+      buffer.foreach { case message@Message(in, time) =>
+        if (time <= watermarkTime) {
+          processor match {
+            case Some(p) =>
+              // .toList forces eager evaluation
+              p.process(in.asInstanceOf[IN]).toList
+            case None =>
+              taskContext.output(Message(in, watermarkTime))
+          }
+        } else {
+          nextBuffer +:= message
+        }
+      }
+      // .toList forces eager evaluation
+      processor.map(_.finish())
+      buffer = nextBuffer
+    }
+  }
+
+}
+
+class TransformTask[IN, OUT](transform: Transform[IN, OUT],
     taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
 
   def this(taskContext: TaskContext, userConf: UserConfig) = {
-    this(userConf.getValue[FunctionRunner[IN, OUT]](
-      GEARPUMP_STREAMING_OPERATOR)(taskContext.system), taskContext, userConf)
+    this(new Transform(taskContext, userConf.getValue[FunctionRunner[IN, OUT]](
+      GEARPUMP_STREAMING_OPERATOR)(taskContext.system)), taskContext, userConf)
   }
 
   override def onStart(startTime: Instant): Unit = {
-    operator.foreach(_.setup())
+    transform.onStart(startTime)
   }
 
   override def onNext(msg: Message): Unit = {
-    val time = msg.timestamp
-
-    operator match {
-      case Some(op) =>
-        op.process(msg.msg.asInstanceOf[IN]).foreach { msg =>
-          taskContext.output(Message(msg, time))
-        }
-      case None =>
-        taskContext.output(Message(msg.msg, time))
-    }
+    transform.onNext(msg)
   }
 
   override def onStop(): Unit = {
-    operator.foreach(_.teardown())
+    transform.onStop()
+  }
+
+  override def onWatermarkProgress(watermark: Instant): Unit = {
+    transform.onWatermarkProgress(watermark)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/07d8d51e/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
index 91edd73..44d724d 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
@@ -28,7 +28,7 @@ import com.gs.collections.impl.list.mutable.FastList
 import com.gs.collections.impl.map.mutable.UnifiedMap
 import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap
 import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, Emit, FunctionRunner}
+import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
 import org.apache.gearpump.streaming.dsl.window.api.Discarding
 import org.apache.gearpump.streaming.task.TaskContext
 import org.apache.gearpump.util.LogUtil
@@ -114,8 +114,8 @@ class DefaultWindowRunner[IN, GROUP, OUT](
         if (!time.isBefore(firstWin.endTime)) {
           val inputs = windowInputs.remove(firstWin)
           if (groupedFnRunners.containsKey(group)) {
-            val reduceFn = AndThen(groupedFnRunners.get(group),
-              new Emit[OUT](output => emitResult(output, time)))
+            val reduceFn = FunctionRunner.withEmitFn(groupedFnRunners.get(group),
+              (output: OUT) => taskContext.output(Message(output, time)))
             inputs.forEach(new Procedure[IN] {
               override def value(t: IN): Unit = {
                 // .toList forces eager evaluation
@@ -134,9 +134,5 @@ class DefaultWindowRunner[IN, GROUP, OUT](
         }
       }
     }
-
-    def emitResult(result: OUT, time: Instant): Unit = {
-      taskContext.output(Message(result, time))
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/07d8d51e/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
index 0d0dfa2..3fceb1a 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
@@ -24,6 +24,7 @@ import org.apache.gearpump._
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.Constants._
 import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
+import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
 /**
@@ -42,49 +43,42 @@ class DataSourceTask[IN, OUT] private[source](
     context: TaskContext,
     conf: UserConfig,
     source: DataSource,
-    operator: Option[FunctionRunner[IN, OUT]])
+    transform: Transform[IN, OUT])
   extends Task(context, conf) {
 
   def this(context: TaskContext, conf: UserConfig) = {
     this(context, conf,
       conf.getValue[DataSource](GEARPUMP_STREAMING_SOURCE)(context.system).get,
-      conf.getValue[FunctionRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system)
+      new Transform[IN, OUT](context,
+        conf.getValue[FunctionRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system))
     )
   }
 
   private val batchSize = conf.getInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE).getOrElse(1000)
 
-  private val processMessage: Message => Unit =
-    operator match {
-      case Some(op) =>
-        (message: Message) => {
-          op.process(message.msg.asInstanceOf[IN]).foreach { m: OUT =>
-            context.output(Message(m, message.timestamp))
-          }
-        }
-      case None =>
-        (message: Message) => context.output(message)
-    }
-
   override def onStart(startTime: Instant): Unit = {
     LOG.info(s"opening data source at $startTime")
     source.open(context, startTime)
-    operator.foreach(_.setup())
+    transform.onStart(startTime)
 
     self ! Watermark(source.getWatermark)
   }
 
   override def onNext(m: Message): Unit = {
     0.until(batchSize).foreach { _ =>
-      Option(source.read()).foreach(processMessage)
+      Option(source.read()).foreach(transform.onNext)
     }
 
     self ! Watermark(source.getWatermark)
   }
 
+  override def onWatermarkProgress(watermark: Instant): Unit = {
+    transform.onWatermarkProgress(watermark)
+  }
+
   override def onStop(): Unit = {
-    operator.foreach(_.teardown())
     LOG.info("closing data source...")
+    transform.onStop()
     source.close()
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/07d8d51e/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
index 08a259a..d26b7d9 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
@@ -23,11 +23,12 @@ import akka.actor.ActorSystem
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.{TestUtil, UserConfig}
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.source.DataSourceTask
+import org.apache.gearpump.streaming.source.{DataSourceTask, Watermark}
 import org.apache.gearpump.streaming.Constants._
 import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction
 import org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource
 import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
+import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform
 import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask}
 import org.apache.gearpump.streaming.dsl.window.api.CountWindows
 import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
@@ -255,6 +256,7 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
         taskContext, conf)
       source.onStart(Instant.EPOCH)
       source.onNext(Message("next"))
+      source.onWatermarkProgress(Watermark.MAX)
       data.foreach { s =>
         verify(taskContext, times(1)).output(MockUtil.argMatch[Message](
           message => message.msg == s))
@@ -268,6 +270,7 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
         conf.withValue(GEARPUMP_STREAMING_OPERATOR, double))
       another.onStart(Instant.EPOCH)
       another.onNext(Message("next"))
+      another.onWatermarkProgress(Watermark.MAX)
       data.foreach { s =>
         verify(anotherTaskContext, times(2)).output(MockUtil.argMatch[Message](
           message => message.msg == s))
@@ -311,7 +314,7 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
     }
   }
 
-  "MergeTask" should {
+  "TransformTask" should {
     "accept two stream and apply the attached operator" in {
 
       // Source with transformer
@@ -319,7 +322,8 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
       val conf = UserConfig.empty
       val double = new FlatMapper[String, String](FlatMapFunction(
         word => List(word, word)), "double")
-      val task = new TransformTask[String, String](Some(double), taskContext, conf)
+      val transform = new Transform[String, String](taskContext, Some(double))
+      val task = new TransformTask[String, String](transform, taskContext, conf)
       task.onStart(Instant.EPOCH)
 
       val data = "1 2  2  3 3  3".split("\\s+")
@@ -328,6 +332,8 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
         task.onNext(Message(input))
       }
 
+      task.onWatermarkProgress(Watermark.MAX)
+
       verify(taskContext, times(data.length * 2)).output(anyObject())
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/07d8d51e/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
index 8266df5..67fa375 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
@@ -23,6 +23,8 @@ import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
 import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
+import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform
+import org.apache.gearpump.streaming.source.Watermark
 import org.mockito.Mockito.{verify, when}
 import org.scalacheck.Gen
 import org.scalatest.{Matchers, PropSpec}
@@ -31,13 +33,14 @@ import org.scalatest.prop.PropertyChecks
 
 class TransformTaskSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
 
-  property("TransformTask.onStart should call SingleInputFunction.setup") {
+  property("TransformTask should setup functions") {
     forAll(Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) { (startTime: Instant) =>
       val taskContext = MockUtil.mockTaskContext
       implicit val system = MockUtil.system
       val config = UserConfig.empty
       val operator = mock[FunctionRunner[Any, Any]]
-      val sourceTask = new TransformTask[Any, Any](Some(operator), taskContext, config)
+      val transform = new Transform[Any, Any](taskContext, Some(operator))
+      val sourceTask = new TransformTask[Any, Any](transform, taskContext, config)
 
       sourceTask.onStart(startTime)
 
@@ -45,28 +48,32 @@ class TransformTaskSpec extends PropSpec with PropertyChecks with Matchers with
     }
   }
 
-  property("TransformTask.onNext should call SingleInputFunction.process") {
+  property("TransformTask should process inputs") {
     forAll(Gen.alphaStr) { (str: String) =>
       val taskContext = MockUtil.mockTaskContext
       implicit val system = MockUtil.system
       val config = UserConfig.empty
       val operator = mock[FunctionRunner[Any, Any]]
-      val task = new TransformTask[Any, Any](Some(operator), taskContext, config)
+      val transform = new Transform[Any, Any](taskContext, Some(operator))
+      val task = new TransformTask[Any, Any](transform, taskContext, config)
       val msg = Message(str)
       when(operator.process(str)).thenReturn(Some(str))
+      when(operator.finish()).thenReturn(None)
 
       task.onNext(msg)
+      task.onWatermarkProgress(Watermark.MAX)
 
-      verify(taskContext).output(msg)
+      verify(taskContext).output(Message(str, Watermark.MAX))
     }
   }
 
-  property("DataSourceTask.onStop should call SingleInputFunction.setup") {
+  property("TransformTask should teardown functions") {
     val taskContext = MockUtil.mockTaskContext
     implicit val system = MockUtil.system
     val config = UserConfig.empty
     val operator = mock[FunctionRunner[Any, Any]]
-    val task = new TransformTask[Any, Any](Some(operator), taskContext, config)
+    val transform = new Transform[Any, Any](taskContext, Some(operator))
+    val task = new TransformTask[Any, Any](transform, taskContext, config)
 
     task.onStop()
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/07d8d51e/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
index f7f6fd9..3c44c4c 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
@@ -24,6 +24,7 @@ import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
 import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
+import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform
 import org.mockito.Mockito._
 import org.scalacheck.Gen
 import org.scalatest.mock.MockitoSugar
@@ -32,7 +33,7 @@ import org.scalatest.prop.PropertyChecks
 
 class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
 
-  property("DataSourceTask.onStart should call DataSource.open") {
+  property("DataSourceTask should setup data source and Transform") {
     forAll(Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) { (startTime: Instant) =>
       val taskContext = MockUtil.mockTaskContext
       implicit val system = MockUtil.system
@@ -40,7 +41,8 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with
       val config = UserConfig.empty
         .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
       val operator = mock[FunctionRunner[Any, Any]]
-      val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, Some(operator))
+      val transform = new Transform[Any, Any](taskContext, Some(operator))
+      val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, transform)
 
       sourceTask.onStart(startTime)
 
@@ -49,31 +51,33 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with
     }
   }
 
-  property("DataSourceTask.onNext should call DataSource.read") {
+  property("DataSourceTask should read from DataSource and transform inputs") {
     forAll(Gen.alphaStr) { (str: String) =>
       val taskContext = MockUtil.mockTaskContext
       implicit val system = MockUtil.system
       val dataSource = mock[DataSource]
       val config = UserConfig.empty
         .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
-
-      val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, None)
+      val transform = new Transform[Any, Any](taskContext, None)
+      val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, transform)
       val msg = Message(str)
       when(dataSource.read()).thenReturn(msg)
 
       sourceTask.onNext(Message("next"))
-      verify(taskContext).output(msg)
+      sourceTask.onWatermarkProgress(Watermark.MAX)
+      verify(taskContext).output(Message(str, Watermark.MAX))
     }
   }
 
-  property("DataSourceTask.onStop should call DataSource.close") {
+  property("DataSourceTask should teardown DataSource and Transform") {
     val taskContext = MockUtil.mockTaskContext
     implicit val system = MockUtil.system
     val dataSource = mock[DataSource]
     val config = UserConfig.empty
       .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
     val operator = mock[FunctionRunner[Any, Any]]
-    val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, Some(operator))
+    val transform = new Transform[Any, Any](taskContext, Some(operator))
+    val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, transform)
 
     sourceTask.onStop()