You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/03/02 05:54:55 UTC
incubator-gearpump git commit: [GEARPUMP-287] Trigger data process on
watermark
Repository: incubator-gearpump
Updated Branches:
refs/heads/master 6a65dcbd7 -> 07d8d51e7
[GEARPUMP-287] Trigger data process on watermark
Author: manuzhang <ow...@gmail.com>
Closes #165 from manuzhang/GEARPUMP-287.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/07d8d51e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/07d8d51e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/07d8d51e
Branch: refs/heads/master
Commit: 07d8d51e7f0ce39564aeb919a61cee35845deacc
Parents: 6a65dcb
Author: manuzhang <ow...@gmail.com>
Authored: Thu Mar 2 13:54:33 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Thu Mar 2 13:54:38 2017 +0800
----------------------------------------------------------------------
.../dsl/plan/functions/FunctionRunner.scala | 8 +++
.../streaming/dsl/task/TransformTask.scala | 70 +++++++++++++++-----
.../dsl/window/impl/WindowRunner.scala | 10 +--
.../streaming/source/DataSourceTask.scala | 28 +++-----
.../dsl/plan/functions/FunctionRunnerSpec.scala | 12 +++-
.../streaming/dsl/task/TransformTaskSpec.scala | 21 ++++--
.../streaming/source/DataSourceTaskSpec.scala | 20 +++---
7 files changed, 112 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/07d8d51e/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
index 36821e4..9dfa6ad 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
@@ -20,6 +20,13 @@ package org.apache.gearpump.streaming.dsl.plan.functions
import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction
import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
+object FunctionRunner {
+ def withEmitFn[IN, OUT](runner: FunctionRunner[IN, OUT],
+ fn: OUT => Unit): FunctionRunner[IN, Unit] = {
+ AndThen(runner, new Emit(fn))
+ }
+}
+
/**
* Interface to invoke SerializableFunction methods
*
@@ -126,3 +133,4 @@ class Emit[T](emit: T => Unit) extends FunctionRunner[T, Unit] {
override def description: String = ""
}
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/07d8d51e/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
index c4278dd..ed48dc7 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
@@ -23,34 +23,74 @@ import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.Constants._
import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
+import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform
import org.apache.gearpump.streaming.task.{Task, TaskContext}
-class TransformTask[IN, OUT](operator: Option[FunctionRunner[IN, OUT]],
+object TransformTask {
+
+ class Transform[IN, OUT](taskContext: TaskContext,
+ operator: Option[FunctionRunner[IN, OUT]],
+ private var buffer: Vector[Message] = Vector.empty[Message]) {
+
+ def onStart(startTime: Instant): Unit = {
+ operator.foreach(_.setup())
+ }
+
+ def onNext(msg: Message): Unit = {
+ buffer +:= msg
+ }
+
+ def onStop(): Unit = {
+ operator.foreach(_.teardown())
+ }
+
+ def onWatermarkProgress(watermark: Instant): Unit = {
+ val watermarkTime = watermark.toEpochMilli
+ var nextBuffer = Vector.empty[Message]
+ val processor = operator.map(FunctionRunner.withEmitFn(_,
+ (out: OUT) => taskContext.output(Message(out, watermarkTime))))
+ buffer.foreach { case message@Message(in, time) =>
+ if (time <= watermarkTime) {
+ processor match {
+ case Some(p) =>
+ // .toList forces eager evaluation
+ p.process(in.asInstanceOf[IN]).toList
+ case None =>
+ taskContext.output(Message(in, watermarkTime))
+ }
+ } else {
+ nextBuffer +:= message
+ }
+ }
+ // .toList forces eager evaluation
+ processor.map(_.finish())
+ buffer = nextBuffer
+ }
+ }
+
+}
+
+class TransformTask[IN, OUT](transform: Transform[IN, OUT],
taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
def this(taskContext: TaskContext, userConf: UserConfig) = {
- this(userConf.getValue[FunctionRunner[IN, OUT]](
- GEARPUMP_STREAMING_OPERATOR)(taskContext.system), taskContext, userConf)
+ this(new Transform(taskContext, userConf.getValue[FunctionRunner[IN, OUT]](
+ GEARPUMP_STREAMING_OPERATOR)(taskContext.system)), taskContext, userConf)
}
override def onStart(startTime: Instant): Unit = {
- operator.foreach(_.setup())
+ transform.onStart(startTime)
}
override def onNext(msg: Message): Unit = {
- val time = msg.timestamp
-
- operator match {
- case Some(op) =>
- op.process(msg.msg.asInstanceOf[IN]).foreach { msg =>
- taskContext.output(Message(msg, time))
- }
- case None =>
- taskContext.output(Message(msg.msg, time))
- }
+ transform.onNext(msg)
}
override def onStop(): Unit = {
- operator.foreach(_.teardown())
+ transform.onStop()
+ }
+
+ override def onWatermarkProgress(watermark: Instant): Unit = {
+ transform.onWatermarkProgress(watermark)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/07d8d51e/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
index 91edd73..44d724d 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
@@ -28,7 +28,7 @@ import com.gs.collections.impl.list.mutable.FastList
import com.gs.collections.impl.map.mutable.UnifiedMap
import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap
import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, Emit, FunctionRunner}
+import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
import org.apache.gearpump.streaming.dsl.window.api.Discarding
import org.apache.gearpump.streaming.task.TaskContext
import org.apache.gearpump.util.LogUtil
@@ -114,8 +114,8 @@ class DefaultWindowRunner[IN, GROUP, OUT](
if (!time.isBefore(firstWin.endTime)) {
val inputs = windowInputs.remove(firstWin)
if (groupedFnRunners.containsKey(group)) {
- val reduceFn = AndThen(groupedFnRunners.get(group),
- new Emit[OUT](output => emitResult(output, time)))
+ val reduceFn = FunctionRunner.withEmitFn(groupedFnRunners.get(group),
+ (output: OUT) => taskContext.output(Message(output, time)))
inputs.forEach(new Procedure[IN] {
override def value(t: IN): Unit = {
// .toList forces eager evaluation
@@ -134,9 +134,5 @@ class DefaultWindowRunner[IN, GROUP, OUT](
}
}
}
-
- def emitResult(result: OUT, time: Instant): Unit = {
- taskContext.output(Message(result, time))
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/07d8d51e/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
index 0d0dfa2..3fceb1a 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
@@ -24,6 +24,7 @@ import org.apache.gearpump._
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.Constants._
import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
+import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform
import org.apache.gearpump.streaming.task.{Task, TaskContext}
/**
@@ -42,49 +43,42 @@ class DataSourceTask[IN, OUT] private[source](
context: TaskContext,
conf: UserConfig,
source: DataSource,
- operator: Option[FunctionRunner[IN, OUT]])
+ transform: Transform[IN, OUT])
extends Task(context, conf) {
def this(context: TaskContext, conf: UserConfig) = {
this(context, conf,
conf.getValue[DataSource](GEARPUMP_STREAMING_SOURCE)(context.system).get,
- conf.getValue[FunctionRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system)
+ new Transform[IN, OUT](context,
+ conf.getValue[FunctionRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system))
)
}
private val batchSize = conf.getInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE).getOrElse(1000)
- private val processMessage: Message => Unit =
- operator match {
- case Some(op) =>
- (message: Message) => {
- op.process(message.msg.asInstanceOf[IN]).foreach { m: OUT =>
- context.output(Message(m, message.timestamp))
- }
- }
- case None =>
- (message: Message) => context.output(message)
- }
-
override def onStart(startTime: Instant): Unit = {
LOG.info(s"opening data source at $startTime")
source.open(context, startTime)
- operator.foreach(_.setup())
+ transform.onStart(startTime)
self ! Watermark(source.getWatermark)
}
override def onNext(m: Message): Unit = {
0.until(batchSize).foreach { _ =>
- Option(source.read()).foreach(processMessage)
+ Option(source.read()).foreach(transform.onNext)
}
self ! Watermark(source.getWatermark)
}
+ override def onWatermarkProgress(watermark: Instant): Unit = {
+ transform.onWatermarkProgress(watermark)
+ }
+
override def onStop(): Unit = {
- operator.foreach(_.teardown())
LOG.info("closing data source...")
+ transform.onStop()
source.close()
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/07d8d51e/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
index 08a259a..d26b7d9 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
@@ -23,11 +23,12 @@ import akka.actor.ActorSystem
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.{TestUtil, UserConfig}
import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.source.DataSourceTask
+import org.apache.gearpump.streaming.source.{DataSourceTask, Watermark}
import org.apache.gearpump.streaming.Constants._
import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction
import org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource
import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
+import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform
import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask}
import org.apache.gearpump.streaming.dsl.window.api.CountWindows
import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
@@ -255,6 +256,7 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
taskContext, conf)
source.onStart(Instant.EPOCH)
source.onNext(Message("next"))
+ source.onWatermarkProgress(Watermark.MAX)
data.foreach { s =>
verify(taskContext, times(1)).output(MockUtil.argMatch[Message](
message => message.msg == s))
@@ -268,6 +270,7 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
conf.withValue(GEARPUMP_STREAMING_OPERATOR, double))
another.onStart(Instant.EPOCH)
another.onNext(Message("next"))
+ another.onWatermarkProgress(Watermark.MAX)
data.foreach { s =>
verify(anotherTaskContext, times(2)).output(MockUtil.argMatch[Message](
message => message.msg == s))
@@ -311,7 +314,7 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
}
}
- "MergeTask" should {
+ "TransformTask" should {
"accept two stream and apply the attached operator" in {
// Source with transformer
@@ -319,7 +322,8 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
val conf = UserConfig.empty
val double = new FlatMapper[String, String](FlatMapFunction(
word => List(word, word)), "double")
- val task = new TransformTask[String, String](Some(double), taskContext, conf)
+ val transform = new Transform[String, String](taskContext, Some(double))
+ val task = new TransformTask[String, String](transform, taskContext, conf)
task.onStart(Instant.EPOCH)
val data = "1 2 2 3 3 3".split("\\s+")
@@ -328,6 +332,8 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
task.onNext(Message(input))
}
+ task.onWatermarkProgress(Watermark.MAX)
+
verify(taskContext, times(data.length * 2)).output(anyObject())
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/07d8d51e/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
index 8266df5..67fa375 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
@@ -23,6 +23,8 @@ import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.MockUtil
import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
+import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform
+import org.apache.gearpump.streaming.source.Watermark
import org.mockito.Mockito.{verify, when}
import org.scalacheck.Gen
import org.scalatest.{Matchers, PropSpec}
@@ -31,13 +33,14 @@ import org.scalatest.prop.PropertyChecks
class TransformTaskSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
- property("TransformTask.onStart should call SingleInputFunction.setup") {
+ property("TransformTask should setup functions") {
forAll(Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) { (startTime: Instant) =>
val taskContext = MockUtil.mockTaskContext
implicit val system = MockUtil.system
val config = UserConfig.empty
val operator = mock[FunctionRunner[Any, Any]]
- val sourceTask = new TransformTask[Any, Any](Some(operator), taskContext, config)
+ val transform = new Transform[Any, Any](taskContext, Some(operator))
+ val sourceTask = new TransformTask[Any, Any](transform, taskContext, config)
sourceTask.onStart(startTime)
@@ -45,28 +48,32 @@ class TransformTaskSpec extends PropSpec with PropertyChecks with Matchers with
}
}
- property("TransformTask.onNext should call SingleInputFunction.process") {
+ property("TransformTask should process inputs") {
forAll(Gen.alphaStr) { (str: String) =>
val taskContext = MockUtil.mockTaskContext
implicit val system = MockUtil.system
val config = UserConfig.empty
val operator = mock[FunctionRunner[Any, Any]]
- val task = new TransformTask[Any, Any](Some(operator), taskContext, config)
+ val transform = new Transform[Any, Any](taskContext, Some(operator))
+ val task = new TransformTask[Any, Any](transform, taskContext, config)
val msg = Message(str)
when(operator.process(str)).thenReturn(Some(str))
+ when(operator.finish()).thenReturn(None)
task.onNext(msg)
+ task.onWatermarkProgress(Watermark.MAX)
- verify(taskContext).output(msg)
+ verify(taskContext).output(Message(str, Watermark.MAX))
}
}
- property("DataSourceTask.onStop should call SingleInputFunction.setup") {
+ property("TransformTask should teardown functions") {
val taskContext = MockUtil.mockTaskContext
implicit val system = MockUtil.system
val config = UserConfig.empty
val operator = mock[FunctionRunner[Any, Any]]
- val task = new TransformTask[Any, Any](Some(operator), taskContext, config)
+ val transform = new Transform[Any, Any](taskContext, Some(operator))
+ val task = new TransformTask[Any, Any](transform, taskContext, config)
task.onStop()
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/07d8d51e/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
index f7f6fd9..3c44c4c 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
@@ -24,6 +24,7 @@ import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.MockUtil
import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
+import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform
import org.mockito.Mockito._
import org.scalacheck.Gen
import org.scalatest.mock.MockitoSugar
@@ -32,7 +33,7 @@ import org.scalatest.prop.PropertyChecks
class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
- property("DataSourceTask.onStart should call DataSource.open") {
+ property("DataSourceTask should setup data source and Transform") {
forAll(Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) { (startTime: Instant) =>
val taskContext = MockUtil.mockTaskContext
implicit val system = MockUtil.system
@@ -40,7 +41,8 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with
val config = UserConfig.empty
.withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
val operator = mock[FunctionRunner[Any, Any]]
- val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, Some(operator))
+ val transform = new Transform[Any, Any](taskContext, Some(operator))
+ val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, transform)
sourceTask.onStart(startTime)
@@ -49,31 +51,33 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with
}
}
- property("DataSourceTask.onNext should call DataSource.read") {
+ property("DataSourceTask should read from DataSource and transform inputs") {
forAll(Gen.alphaStr) { (str: String) =>
val taskContext = MockUtil.mockTaskContext
implicit val system = MockUtil.system
val dataSource = mock[DataSource]
val config = UserConfig.empty
.withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
-
- val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, None)
+ val transform = new Transform[Any, Any](taskContext, None)
+ val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, transform)
val msg = Message(str)
when(dataSource.read()).thenReturn(msg)
sourceTask.onNext(Message("next"))
- verify(taskContext).output(msg)
+ sourceTask.onWatermarkProgress(Watermark.MAX)
+ verify(taskContext).output(Message(str, Watermark.MAX))
}
}
- property("DataSourceTask.onStop should call DataSource.close") {
+ property("DataSourceTask should teardown DataSource and Transform") {
val taskContext = MockUtil.mockTaskContext
implicit val system = MockUtil.system
val dataSource = mock[DataSource]
val config = UserConfig.empty
.withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
val operator = mock[FunctionRunner[Any, Any]]
- val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, Some(operator))
+ val transform = new Transform[Any, Any](taskContext, Some(operator))
+ val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, transform)
sourceTask.onStop()