You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/06/14 02:07:18 UTC
[1/2] incubator-gearpump git commit: [GEARPUMP-316] Decouple groupBy
from window
Repository: incubator-gearpump
Updated Branches:
refs/heads/master c1370d9bf -> 24e1a4546
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
index d007e09..ca0135d 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
@@ -25,13 +25,13 @@ import org.apache.gearpump.cluster.{TestUtil, UserConfig}
import org.apache.gearpump.streaming.Processor
import org.apache.gearpump.streaming.Processor.DefaultProcessor
import org.apache.gearpump.streaming.dsl.plan.OpSpec.{AnySink, AnySource, AnyTask}
-import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, FunctionRunner}
+import org.apache.gearpump.streaming.dsl.plan.functions.{DummyRunner, FlatMapper, FunctionRunner}
import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
-import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
+import org.apache.gearpump.streaming.dsl.window.api.GlobalWindows
+import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, WindowRunner}
import org.apache.gearpump.streaming.sink.DataSink
import org.apache.gearpump.streaming.source.DataSource
import org.apache.gearpump.streaming.task.{Task, TaskContext}
-import org.mockito.Matchers._
import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec}
import org.scalatest.mock.MockitoSugar
@@ -61,16 +61,16 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS
"DataSourceOp" should {
- "chain ChainableOp" in {
+ "chain TransformOp" in {
val dataSource = new AnySource
val dataSourceOp = DataSourceOp(dataSource)
- val chainableOp = mock[ChainableOp[Any, Any]]
+ val transformOp = mock[TransformOp[Any, Any]]
val fn = mock[FunctionRunner[Any, Any]]
+ when(transformOp.fn).thenReturn(fn)
- val chainedOp = dataSourceOp.chain(chainableOp)
+ val chainedOp = dataSourceOp.chain(transformOp)
chainedOp shouldBe a[DataSourceOp]
- verify(chainableOp).fn
unchainableOps.foreach { op =>
intercept[OpChainException] {
@@ -79,13 +79,13 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS
}
}
- "get Processor of DataSource" in {
+ "be translated into processor" in {
val dataSource = new AnySource
val dataSourceOp = DataSourceOp(dataSource)
- val processor = dataSourceOp.getProcessor
+ val processor = dataSourceOp.toProcessor
processor shouldBe a[Processor[_]]
processor.parallelism shouldBe dataSourceOp.parallelism
- processor.description shouldBe dataSourceOp.description
+ processor.description shouldBe s"${dataSourceOp.description}.globalWindows"
}
}
@@ -94,7 +94,7 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS
"not chain any Op" in {
val dataSink = new AnySink
val dataSinkOp = DataSinkOp(dataSink)
- val chainableOp = mock[ChainableOp[Any, Any]]
+ val chainableOp = mock[TransformOp[Any, Any]]
val ops = chainableOp +: unchainableOps
ops.foreach { op =>
intercept[OpChainException] {
@@ -103,10 +103,10 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS
}
}
- "get Processor of DataSink" in {
+ "be translated to processor" in {
val dataSink = new AnySink
val dataSinkOp = DataSinkOp(dataSink)
- val processor = dataSinkOp.getProcessor
+ val processor = dataSinkOp.toProcessor
processor shouldBe a[Processor[_]]
processor.parallelism shouldBe dataSinkOp.parallelism
processor.description shouldBe dataSinkOp.description
@@ -117,7 +117,7 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS
"not chain any Op" in {
val processorOp = new ProcessorOp[AnyTask]
- val chainableOp = mock[ChainableOp[Any, Any]]
+ val chainableOp = mock[TransformOp[Any, Any]]
val ops = chainableOp +: unchainableOps
ops.foreach { op =>
intercept[OpChainException] {
@@ -126,41 +126,41 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS
}
}
- "get Processor" in {
+ "be translated into processor" in {
val processorOp = new ProcessorOp[AnyTask]
- val processor = processorOp.getProcessor
+ val processor = processorOp.toProcessor
processor shouldBe a [DefaultProcessor[_]]
processor.parallelism shouldBe processorOp.parallelism
processor.description shouldBe processorOp.description
}
}
- "ChainableOp" should {
+ "TransformOp" should {
- "chain ChainableOp" in {
+ "chain TransformOp" in {
val fn1 = mock[FunctionRunner[Any, Any]]
- val chainableOp1 = ChainableOp[Any, Any](fn1)
+ val transformOp1 = TransformOp[Any, Any](fn1)
val fn2 = mock[FunctionRunner[Any, Any]]
- val chainableOp2 = ChainableOp[Any, Any](fn2)
+ val transformOp2 = TransformOp[Any, Any](fn2)
- val chainedOp = chainableOp1.chain(chainableOp2)
+ val chainedOp = transformOp1.chain(transformOp2)
- chainedOp shouldBe a[ChainableOp[_, _]]
+ chainedOp shouldBe a[TransformOp[_, _]]
unchainableOps.foreach { op =>
intercept[OpChainException] {
- chainableOp1.chain(op)
+ transformOp1.chain(op)
}
}
}
- "get Processor" in {
+ "be translated to processor" in {
val fn = mock[FlatMapFunction[Any, Any]]
val flatMapper = new FlatMapper(fn, "flatMap")
- val chainableOp = ChainableOp[Any, Any](flatMapper)
+ val transformOp = TransformOp[Any, Any](flatMapper)
- val processor = chainableOp.getProcessor
+ val processor = transformOp.toProcessor
processor shouldBe a[Processor[_]]
processor.parallelism shouldBe 1
}
@@ -168,14 +168,16 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS
"GroupByOp" should {
- "chain ChainableOp" in {
- val groupBy = mock[GroupAlsoByWindow[Any, Any]]
- val groupByOp = GroupByOp[Any, Any](groupBy)
- val fn = mock[FunctionRunner[Any, Any]]
- val chainableOp = mock[ChainableOp[Any, Any]]
- when(chainableOp.fn).thenReturn(fn)
+ val groupBy = (any: Any) => any
+ val groupByOp = GroupByOp[Any, Any](groupBy)
+
+ "chain WindowTransformOp" in {
- val chainedOp = groupByOp.chain(chainableOp)
+ val runner = new DefaultWindowRunner[Any, Any](GlobalWindows(), new DummyRunner())
+ val windowTransformOp = mock[WindowTransformOp[Any, Any]]
+ when(windowTransformOp.windowRunner).thenReturn(runner)
+
+ val chainedOp = groupByOp.chain(windowTransformOp)
chainedOp shouldBe a[GroupByOp[_, _]]
unchainableOps.foreach { op =>
@@ -185,25 +187,23 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS
}
}
- "delegate to groupByFn on getProcessor" in {
- val groupBy = mock[GroupAlsoByWindow[Any, Any]]
- val groupByOp = GroupByOp[Any, Any](groupBy)
-
- groupByOp.getProcessor
- verify(groupBy).getProcessor(anyInt, anyString, any[UserConfig])(any[ActorSystem])
+ "be translated to processor" in {
+ val processor = groupByOp.toProcessor
+ processor shouldBe a[Processor[_]]
+ processor.parallelism shouldBe 1
}
}
"MergeOp" should {
- val mergeOp = MergeOp("merge")
+ val mergeOp = MergeOp()
- "chain ChainableOp" in {
- val fn = mock[FunctionRunner[Any, Any]]
- val chainableOp = mock[ChainableOp[Any, Any]]
- when(chainableOp.fn).thenReturn(fn)
+ "chain WindowTransformOp" in {
+ val runner = mock[WindowRunner[Any, Any]]
+ val windowTransformOp = mock[WindowTransformOp[Any, Any]]
+ when(windowTransformOp.windowRunner).thenReturn(runner)
- val chainedOp = mergeOp.chain(chainableOp)
+ val chainedOp = mergeOp.chain(windowTransformOp)
chainedOp shouldBe a [MergeOp]
unchainableOps.foreach { op =>
@@ -213,8 +213,8 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS
}
}
- "get Processor" in {
- val processor = mergeOp.getProcessor
+ "be translated to processor" in {
+ val processor = mergeOp.toProcessor
processor shouldBe a[Processor[_]]
processor.parallelism shouldBe 1
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
index 70abde9..70d21b5 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
@@ -24,16 +24,14 @@ import akka.actor.ActorSystem
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.{TestUtil, UserConfig}
import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction
-import org.apache.gearpump.streaming.partitioner.CoLocationPartitioner
-import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
+import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, GroupByPartitioner}
import org.apache.gearpump.streaming.dsl.plan.PlannerSpec._
import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, FoldRunner}
import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
-import org.apache.gearpump.streaming.dsl.window.api.CountWindows
-import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
import org.apache.gearpump.streaming.sink.DataSink
import org.apache.gearpump.streaming.source.DataSource
import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.dsl.window.api.GlobalWindows
import org.apache.gearpump.streaming.task.{Task, TaskContext}
import org.apache.gearpump.util.Graph
import org.scalatest.mock.MockitoSugar
@@ -58,10 +56,11 @@ class PlannerSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Moc
"Planner" should "chain operations" in {
val graph = Graph.empty[Op, OpEdge]
val sourceOp = DataSourceOp(new AnySource)
- val groupBy = GroupAlsoByWindow((any: Any) => any, CountWindows.apply[Any](1))
+ val groupBy = (any: Any) => any
val groupByOp = GroupByOp(groupBy)
- val flatMapOp = ChainableOp[Any, Any](anyFlatMapper)
- val reduceOp = ChainableOp[Any, Option[Any]](anyReducer)
+ val windowOp = WindowOp(GlobalWindows())
+ val flatMapOp = TransformOp[Any, Any](anyFlatMapper)
+ val reduceOp = TransformOp[Any, Option[Any]](anyReducer)
val processorOp = new ProcessorOp[AnyTask]
val sinkOp = DataSinkOp(new AnySink)
val directEdge = Direct
@@ -70,8 +69,10 @@ class PlannerSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Moc
graph.addVertex(sourceOp)
graph.addVertex(groupByOp)
graph.addEdge(sourceOp, shuffleEdge, groupByOp)
+ graph.addVertex(windowOp)
+ graph.addEdge(groupByOp, directEdge, windowOp)
graph.addVertex(flatMapOp)
- graph.addEdge(groupByOp, directEdge, flatMapOp)
+ graph.addEdge(windowOp, directEdge, flatMapOp)
graph.addVertex(reduceOp)
graph.addEdge(flatMapOp, directEdge, reduceOp)
graph.addVertex(processorOp)
@@ -86,9 +87,11 @@ class PlannerSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Moc
.mapVertex(_.description)
plan.vertices.toSet should contain theSameElementsAs
- Set("source", "groupBy", "processor", "sink")
- plan.outgoingEdgesOf("source").iterator.next()._2 shouldBe a[GroupByPartitioner[_, _]]
- plan.outgoingEdgesOf("groupBy").iterator.next()._2 shouldBe a[CoLocationPartitioner]
+ Set("source.globalWindows", "groupBy.globalWindows.flatMap.reduce", "processor", "sink")
+ plan.outgoingEdgesOf("source.globalWindows").iterator.next()._2 shouldBe
+ a[GroupByPartitioner[_, _]]
+ plan.outgoingEdgesOf("groupBy.globalWindows.flatMap.reduce").iterator.next()._2 shouldBe
+ a[CoLocationPartitioner]
plan.outgoingEdgesOf("processor").iterator.next()._2 shouldBe a[CoLocationPartitioner]
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
index f5d7c20..6244224 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
@@ -19,28 +19,22 @@ package org.apache.gearpump.streaming.dsl.plan.functions
import java.time.Instant
-import akka.actor.ActorSystem
import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.{TestUtil, UserConfig}
+import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.MockUtil
import org.apache.gearpump.streaming.source.{DataSourceTask, Watermark}
import org.apache.gearpump.streaming.Constants._
import org.apache.gearpump.streaming.dsl.api.functions.{FoldFunction, ReduceFunction}
import org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource
import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
-import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform
-import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask}
-import org.apache.gearpump.streaming.dsl.window.api.CountWindows
-import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
-import org.mockito.ArgumentCaptor
+import org.apache.gearpump.streaming.dsl.task.TransformTask
+import org.apache.gearpump.streaming.dsl.window.api.GlobalWindows
+import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, WindowRunner}
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.scalatest.{Matchers, WordSpec}
import org.scalatest.mock.MockitoSugar
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunnerSpec._
@@ -216,40 +210,6 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
}
}
- "Emit" should {
-
- val emitFunction = mock[T => Unit]
- val emit = new Emit[T](emitFunction)
-
- "emit input value when processing input value" in {
- val input = mock[T]
-
- emit.process(input) shouldBe List.empty[Unit]
-
- verify(emitFunction).apply(input)
- }
-
- "return empty description" in {
- emit.description shouldBe ""
- }
-
- "return None on finish" in {
- emit.finish() shouldBe List.empty[Unit]
- }
-
- "do nothing on setup" in {
- emit.setup()
-
- verifyZeroInteractions(emitFunction)
- }
-
- "do nothing on teardown" in {
- emit.teardown()
-
- verifyZeroInteractions(emitFunction)
- }
- }
-
"Source" should {
"iterate over input source and apply attached operator" in {
@@ -258,7 +218,11 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
val data = "one two three".split("\\s+")
val dataSource = new CollectionDataSource[String](data)
- val conf = UserConfig.empty.withValue(GEARPUMP_STREAMING_SOURCE, dataSource)
+ val runner1 = new DefaultWindowRunner[String, String](
+ GlobalWindows(), new DummyRunner[String])
+ val conf = UserConfig.empty
+ .withValue(GEARPUMP_STREAMING_SOURCE, dataSource)
+ .withValue[WindowRunner[String, String]](GEARPUMP_STREAMING_OPERATOR, runner1)
// Source with no transformer
val source = new DataSourceTask[String, String](
@@ -275,8 +239,10 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
val anotherTaskContext = MockUtil.mockTaskContext
val double = new FlatMapper[String, String](FlatMapFunction(
word => List(word, word)), "double")
+ val runner2 = new DefaultWindowRunner[String, String](
+ GlobalWindows(), double)
val another = new DataSourceTask(anotherTaskContext,
- conf.withValue(GEARPUMP_STREAMING_OPERATOR, double))
+ conf.withValue(GEARPUMP_STREAMING_OPERATOR, runner2))
another.onStart(Instant.EPOCH)
another.onNext(Message("next"))
another.onWatermarkProgress(Watermark.MAX)
@@ -287,44 +253,8 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
}
}
- "CountTriggerTask" should {
- "group input by groupBy Function and " +
- "apply attached operator for each group" in {
-
- val data = "1 2 2 3 3 3"
-
- val concat = new FoldRunner[String, Option[String]](ReduceFunction({ (left, right) =>
- left + right}), "concat")
-
- implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
- val config = UserConfig.empty.withValue[FunctionRunner[String, Option[String]]](
- GEARPUMP_STREAMING_OPERATOR, concat)
-
- val taskContext = MockUtil.mockTaskContext
-
- val groupBy = GroupAlsoByWindow((input: String) => input,
- CountWindows.apply[String](1).accumulating)
- val task = new CountTriggerTask[String, String](groupBy, taskContext, config)
- task.onStart(Instant.EPOCH)
-
- val peopleCaptor = ArgumentCaptor.forClass(classOf[Message])
-
- data.split("\\s+").foreach { word =>
- task.onNext(Message(word))
- }
- verify(taskContext, times(6)).output(peopleCaptor.capture())
-
- import scala.collection.JavaConverters._
-
- val values = peopleCaptor.getAllValues.asScala.map(input =>
- input.value.asInstanceOf[Option[String]].get)
- assert(values.mkString(",") == "1,2,22,3,33,333")
- system.terminate()
- Await.result(system.whenTerminated, Duration.Inf)
- }
- }
- "TransformTask" should {
+ "MergeTask" should {
"accept two stream and apply the attached operator" in {
// Source with transformer
@@ -332,7 +262,7 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
val conf = UserConfig.empty
val double = new FlatMapper[String, String](FlatMapFunction(
word => List(word, word)), "double")
- val transform = new Transform[String, String](taskContext, Some(double))
+ val transform = new DefaultWindowRunner[String, String](GlobalWindows(), double)
val task = new TransformTask[String, String](transform, taskContext, conf)
task.onStart(Instant.EPOCH)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
index 5b90a3e..c8c8b9f 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
@@ -61,9 +61,9 @@ class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with M
dag.vertices.size shouldBe 2
dag.vertices.foreach { processor =>
processor.taskClass shouldBe classOf[DataSourceTask[_, _]].getName
- if (processor.description == "A") {
+ if (processor.description == "A.globalWindows") {
processor.parallelism shouldBe 2
- } else if (processor.description == "B") {
+ } else if (processor.description == "B.globalWindows") {
processor.parallelism shouldBe 3
} else {
fail(s"undefined source ${processor.description}")
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
index 4c7e209..ef8f932 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
@@ -22,10 +22,9 @@ import akka.actor._
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.client.ClientContext
import org.apache.gearpump.cluster.{TestUtil, UserConfig}
-import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
import org.apache.gearpump.streaming.dsl.scalaapi.StreamSpec.Join
-import org.apache.gearpump.streaming.dsl.task.{EventTimeTriggerTask, TransformTask}
-import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, HashPartitioner, PartitionerDescription}
+import org.apache.gearpump.streaming.dsl.task.{GroupByTask, TransformTask}
+import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, GroupByPartitioner, HashPartitioner, PartitionerDescription}
import org.apache.gearpump.streaming.source.DataSourceTask
import org.apache.gearpump.streaming.task.{Task, TaskContext}
import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication}
@@ -92,7 +91,7 @@ class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Mock
private def getExpectedDagTopology: Graph[String, String] = {
val source = classOf[DataSourceTask[_, _]].getName
- val group = classOf[EventTimeTriggerTask[_, _]].getName
+ val group = classOf[GroupByTask[_, _, _]].getName
val merge = classOf[TransformTask[_, _]].getName
val join = classOf[Join].getName
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala
deleted file mode 100644
index 1a4958a..0000000
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.dsl.task
-
-import java.time.Instant
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.dsl.window.api.CountWindows
-import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, WindowRunner}
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-class CountTriggerTaskSpec extends PropSpec with PropertyChecks
- with Matchers with MockitoSugar {
-
- property("CountTriggerTask should trigger output by number of messages in a window") {
-
- implicit val system = MockUtil.system
-
- val numGen = Gen.chooseNum[Int](1, 1000)
-
- forAll(numGen, numGen) { (windowSize: Int, msgNum: Int) =>
-
- val groupBy = mock[GroupAlsoByWindow[Any, Any]]
- val window = CountWindows.apply[Any](windowSize)
- when(groupBy.window).thenReturn(window)
- val windowRunner = mock[WindowRunner]
- val userConfig = UserConfig.empty
-
- val task = new CountTriggerTask[Any, Any](groupBy, windowRunner,
- MockUtil.mockTaskContext, userConfig)
- val message = mock[Message]
-
- for (i <- 1 to msgNum) {
- task.onNext(message)
- }
- verify(windowRunner, times(msgNum)).process(message)
- verify(windowRunner, times(msgNum / windowSize)).trigger(Instant.ofEpochMilli(windowSize))
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala
deleted file mode 100644
index 9414c76..0000000
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.dsl.task
-
-import java.time.{Duration, Instant}
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, SlidingWindows}
-import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, WindowRunner}
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.{Matchers, PropSpec}
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.prop.PropertyChecks
-
-class EventTimeTriggerTaskSpec extends PropSpec with PropertyChecks
- with Matchers with MockitoSugar {
-
- property("EventTimeTriggerTask should trigger on watermark") {
- val longGen = Gen.chooseNum[Long](1L, 1000L)
- val windowSizeGen = longGen
- val windowStepGen = longGen
- val watermarkGen = longGen.map(Instant.ofEpochMilli)
-
- forAll(windowSizeGen, windowStepGen, watermarkGen) {
- (windowSize: Long, windowStep: Long, watermark: Instant) =>
-
- val window = SlidingWindows.apply[Any](Duration.ofMillis(windowSize),
- Duration.ofMillis(windowStep)).triggering(EventTimeTrigger)
- val groupBy = mock[GroupAlsoByWindow[Any, Any]]
- val windowRunner = mock[WindowRunner]
- val context = MockUtil.mockTaskContext
- val config = UserConfig.empty
-
- when(groupBy.window).thenReturn(window)
-
- val task = new EventTimeTriggerTask[Any, Any](groupBy, windowRunner, context, config)
-
- val message = mock[Message]
- task.onNext(message)
- verify(windowRunner).process(message)
-
- task.onWatermarkProgress(watermark)
- verify(windowRunner).trigger(any[Instant])
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala
new file mode 100644
index 0000000..0f87a1c
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.task
+
+import java.time.Instant
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.dsl.plan.functions.DummyRunner
+import org.apache.gearpump.streaming.dsl.window.api.GlobalWindows
+import org.apache.gearpump.streaming.{Constants, MockUtil}
+import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, WindowRunner}
+import org.apache.gearpump.streaming.source.Watermark
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.{Matchers, PropSpec}
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+
+class GroupByTaskSpec extends PropSpec with PropertyChecks
+ with Matchers with MockitoSugar {
+
+ property("GroupByTask should trigger on watermark") {
+ val longGen = Gen.chooseNum[Long](1L, 1000L).map(Instant.ofEpochMilli)
+
+ forAll(longGen) { (time: Instant) =>
+ val groupBy = mock[Any => Int]
+ val windowRunner = new DefaultWindowRunner[Any, Any](GlobalWindows(), new DummyRunner[Any])
+ val context = MockUtil.mockTaskContext
+ val config = UserConfig.empty
+ .withValue(
+ Constants.GEARPUMP_STREAMING_OPERATOR, windowRunner)(MockUtil.system)
+
+ val task = new GroupByTask[Any, Int, Any](groupBy, context, config)
+ val value = time
+ val message = Message(value, time)
+ when(groupBy(time)).thenReturn(0)
+ task.onNext(message)
+
+ task.onWatermarkProgress(Watermark.MAX)
+ verify(context).output(message)
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala
deleted file mode 100644
index cbc9e0c..0000000
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.dsl.task
-
-import java.time.{Duration, Instant}
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.dsl.task.ProcessingTimeTriggerTask.Triggering
-import org.apache.gearpump.streaming.dsl.window.api.{ProcessingTimeTrigger, SlidingWindows}
-import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, WindowRunner}
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.{Matchers, PropSpec}
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.prop.PropertyChecks
-
-class ProcessingTimeTriggerTaskSpec extends PropSpec with PropertyChecks
- with Matchers with MockitoSugar {
-
- property("ProcessingTimeTriggerTask should trigger on system time interval") {
- val longGen = Gen.chooseNum[Long](1L, 1000L)
- val windowSizeGen = longGen
- val windowStepGen = longGen
- val startTimeGen = longGen.map(Instant.ofEpochMilli)
-
- forAll(windowSizeGen, windowStepGen, startTimeGen) {
- (windowSize: Long, windowStep: Long, startTime: Instant) =>
-
- val window = SlidingWindows.apply[Any](Duration.ofMillis(windowSize),
- Duration.ofMillis(windowStep)).triggering(ProcessingTimeTrigger)
- val groupBy = mock[GroupAlsoByWindow[Any, Any]]
- val windowRunner = mock[WindowRunner]
- val context = MockUtil.mockTaskContext
- val config = UserConfig.empty
-
- when(groupBy.window).thenReturn(window)
-
- val task = new ProcessingTimeTriggerTask[Any, Any](groupBy, windowRunner, context, config)
-
- task.onStart(startTime)
-
- val message = mock[Message]
- task.onNext(message)
- verify(windowRunner).process(message)
-
- task.receiveUnManagedMessage(Triggering)
- verify(windowRunner).trigger(any[Instant])
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
index 481925a..6b66f01 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
@@ -22,11 +22,9 @@ import java.time.Instant
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
-import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform
-import org.apache.gearpump.streaming.source.Watermark
+import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner}
import org.mockito.{Matchers => MockitoMatchers}
-import org.mockito.Mockito.{times, verify, when}
+import org.mockito.Mockito.{verify, when}
import org.scalacheck.Gen
import org.scalatest.{Matchers, PropSpec}
import org.scalatest.mock.MockitoSugar
@@ -34,43 +32,25 @@ import org.scalatest.prop.PropertyChecks
class TransformTaskSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
- private val timeGen = Gen.chooseNum[Long](Watermark.MIN.toEpochMilli,
- Watermark.MAX.toEpochMilli - 1).map(Instant.ofEpochMilli)
- private val runnerGen = {
- val runner = mock[FunctionRunner[Any, Any]]
- Gen.oneOf(Some(runner), None)
- }
-
- property("TransformTask should emit on watermark") {
- val msgGen = for {
- str <- Gen.alphaStr.suchThat(!_.isEmpty)
- t <- timeGen
- } yield Message(s"$str:$t", t)
- val msgsGen = Gen.listOfN(10, msgGen)
-
- forAll(runnerGen, msgsGen) {
- (runner: Option[FunctionRunner[Any, Any]], msgs: List[Message]) =>
- val taskContext = MockUtil.mockTaskContext
- implicit val system = MockUtil.system
- val config = UserConfig.empty
- val transform = new Transform[Any, Any](taskContext, runner)
- val task = new TransformTask[Any, Any](transform, taskContext, config)
-
- msgs.foreach(task.onNext)
-
- runner.foreach(r => when(r.finish()).thenReturn(None))
- task.onWatermarkProgress(Watermark.MIN)
- verify(taskContext, times(0)).output(MockitoMatchers.any[Message])
-
- msgs.foreach { msg =>
- runner.foreach(r =>
- when(r.process(msg.value)).thenReturn(Some(msg.value)))
- }
- task.onWatermarkProgress(Watermark.MAX)
-
- msgs.foreach { msg =>
- verify(taskContext).output(MockitoMatchers.eq(msg))
- }
+ property("MergeTask should trigger on watermark") {
+ val longGen = Gen.chooseNum[Long](1L, 1000L)
+ val watermarkGen = longGen.map(Instant.ofEpochMilli)
+
+ forAll(watermarkGen) { (watermark: Instant) =>
+ val windowRunner = mock[WindowRunner[Any, Any]]
+ val context = MockUtil.mockTaskContext
+ val config = UserConfig.empty
+ val task = new TransformTask[Any, Any](windowRunner, context, config)
+ val time = watermark.minusMillis(1L)
+ val value: Any = time
+ val message = Message(value, time)
+
+ task.onNext(message)
+ verify(windowRunner).process(TimestampedValue(value, time))
+
+ when(windowRunner.trigger(watermark)).thenReturn(Some(TimestampedValue(value, time)))
+ task.onWatermarkProgress(watermark)
+ verify(context).output(message)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala
index fbbee3e..98e9919 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala
@@ -21,18 +21,15 @@ package org.apache.gearpump.streaming.dsl.window.impl
import java.time.{Duration, Instant}
import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction
-import org.apache.gearpump.streaming.{Constants, MockUtil}
+import org.apache.gearpump.streaming.MockUtil
import org.apache.gearpump.streaming.dsl.plan.functions.FoldRunner
import org.apache.gearpump.streaming.dsl.window.api.SessionWindows
import org.apache.gearpump.streaming.source.Watermark
-import org.mockito.Mockito.{times, verify}
import org.scalatest.{Matchers, PropSpec}
import org.scalatest.mock.MockitoSugar
import org.scalatest.prop.PropertyChecks
-
class DefaultWindowRunnerSpec extends PropSpec with PropertyChecks
with Matchers with MockitoSugar {
@@ -40,34 +37,25 @@ class DefaultWindowRunnerSpec extends PropSpec with PropertyChecks
val data = List(
Message(("foo", 1L), Instant.ofEpochMilli(1L)),
- Message(("bar", 1L), Instant.ofEpochMilli(8L)),
Message(("foo", 1L), Instant.ofEpochMilli(15L)),
- Message(("bar", 1L), Instant.ofEpochMilli(17L)),
- Message(("bar", 1L), Instant.ofEpochMilli(18L)),
Message(("foo", 1L), Instant.ofEpochMilli(25L)),
- Message(("foo", 1L), Instant.ofEpochMilli(26L)),
- Message(("bar", 1L), Instant.ofEpochMilli(30L)),
- Message(("bar", 1L), Instant.ofEpochMilli(31L))
+ Message(("foo", 1L), Instant.ofEpochMilli(26L))
)
type KV = (String, Long)
- val taskContext = MockUtil.mockTaskContext
implicit val system = MockUtil.system
val reduce = ReduceFunction[KV]((kv1, kv2) => (kv1._1, kv1._2 + kv2._2))
- val operator = new FoldRunner(reduce, "reduce")
- val userConfig = UserConfig.empty.withValue(
- Constants.GEARPUMP_STREAMING_OPERATOR, operator)
- val windows = SessionWindows.apply[KV](Duration.ofMillis(4L))
- val groupBy = GroupAlsoByWindow[KV, String](_._1, windows)
- val windowRunner = new DefaultWindowRunner(taskContext, userConfig, groupBy)
-
- data.foreach(windowRunner.process)
- windowRunner.trigger(Watermark.MAX)
-
- verify(taskContext, times(2)).output(Message(Some(("foo", 1)), Watermark.MAX))
- verify(taskContext).output(Message(Some(("foo", 2)), Watermark.MAX))
- verify(taskContext, times(2)).output(Message(Some(("bar", 2)), Watermark.MAX))
- verify(taskContext).output(Message(Some(("bar", 1)), Watermark.MAX))
+ val windows = SessionWindows.apply(Duration.ofMillis(4L))
+ val windowRunner = new DefaultWindowRunner[KV, Option[KV]](windows,
+ new FoldRunner[KV, Option[KV]](reduce, "reduce"))
+
+ data.foreach(m => windowRunner.process(TimestampedValue(m.value.asInstanceOf[KV], m.timestamp)))
+ windowRunner.trigger(Watermark.MAX).toList shouldBe
+ List(
+ TimestampedValue(Some(("foo", 1)), Instant.ofEpochMilli(4)),
+ TimestampedValue(Some(("foo", 1)), Instant.ofEpochMilli(18)),
+ TimestampedValue(Some(("foo", 2)), Instant.ofEpochMilli(29))
+ )
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/partitioner/GroupByPartitionerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/partitioner/GroupByPartitionerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/partitioner/GroupByPartitionerSpec.scala
new file mode 100644
index 0000000..038f91d
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/partitioner/GroupByPartitionerSpec.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.partitioner
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.streaming.partitioner.GroupByPartitionerSpec.People
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+class GroupByPartitionerSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
+
+ it should "group by message payload and window" in {
+ val mark = People("Mark", "male")
+ val tom = People("Tom", "male")
+ val michelle = People("Michelle", "female")
+
+ val partitionNum = 10
+
+ val groupBy = new GroupByPartitioner[People, String](_.gender)
+ groupBy.getPartition(Message(mark, 1L), partitionNum) shouldBe
+ groupBy.getPartition(Message(tom, 2L), partitionNum)
+
+ groupBy.getPartition(Message(mark, 2L), partitionNum) should not be
+ groupBy.getPartition(Message(michelle, 3L), partitionNum)
+ }
+}
+
+object GroupByPartitionerSpec {
+ case class People(name: String, gender: String)
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
index 7651251..f7a3a63 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
@@ -23,8 +23,7 @@ import java.time.Instant
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
-import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform
+import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner}
import org.mockito.Mockito._
import org.scalacheck.Gen
import org.scalatest.mock.MockitoSugar
@@ -33,21 +32,16 @@ import org.scalatest.prop.PropertyChecks
class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
- private val runnerGen = {
- val runner = mock[FunctionRunner[Any, Any]]
- Gen.oneOf(Some(runner), None)
- }
-
property("DataSourceTask should setup data source") {
- forAll(runnerGen, Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) {
- (runner: Option[FunctionRunner[Any, Any]], startTime: Instant) =>
+ forAll(Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) {
+ (startTime: Instant) =>
val taskContext = MockUtil.mockTaskContext
implicit val system = MockUtil.system
val dataSource = mock[DataSource]
val config = UserConfig.empty
.withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
- val transform = new Transform[Any, Any](taskContext, runner)
- val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, transform)
+ val runner = mock[WindowRunner[Any, Any]]
+ val sourceTask = new DataSourceTask[Any, Any](dataSource, runner, taskContext, config)
sourceTask.onStart(startTime)
@@ -56,21 +50,20 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with
}
property("DataSourceTask should read from DataSource and transform inputs") {
- forAll(runnerGen, Gen.alphaStr, Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) {
- (runner: Option[FunctionRunner[Any, Any]], str: String, timestamp: Instant) =>
+ forAll(Gen.alphaStr, Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) {
+ (str: String, timestamp: Instant) =>
val taskContext = MockUtil.mockTaskContext
implicit val system = MockUtil.system
val dataSource = mock[DataSource]
val config = UserConfig.empty
.withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
- val transform = new Transform[Any, Any](taskContext, runner)
- val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, transform)
+ val runner = mock[WindowRunner[Any, Any]]
+ val sourceTask = new DataSourceTask[Any, Any](dataSource, runner, taskContext, config)
val msg = Message(str, timestamp)
when(dataSource.read()).thenReturn(msg)
- runner.foreach(r => {
- when(r.process(str)).thenReturn(Some(str))
- when(r.finish()).thenReturn(None)
- })
+
+ when(runner.trigger(Watermark.MAX)).thenReturn(
+ Some(TimestampedValue(str.asInstanceOf[Any], timestamp)))
sourceTask.onNext(Message("next"))
sourceTask.onWatermarkProgress(Watermark.MAX)
@@ -80,18 +73,16 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with
}
property("DataSourceTask should teardown DataSource") {
- forAll(runnerGen) { (runner: Option[FunctionRunner[Any, Any]]) =>
- val taskContext = MockUtil.mockTaskContext
- implicit val system = MockUtil.system
- val dataSource = mock[DataSource]
- val config = UserConfig.empty
- .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
- val transform = new Transform[Any, Any](taskContext, runner)
- val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, transform)
+ val taskContext = MockUtil.mockTaskContext
+ implicit val system = MockUtil.system
+ val dataSource = mock[DataSource]
+ val config = UserConfig.empty
+ .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
+ val runner = mock[WindowRunner[Any, Any]]
+ val sourceTask = new DataSourceTask[Any, Any](dataSource, runner, taskContext, config)
- sourceTask.onStop()
+ sourceTask.onStop()
- verify(dataSource).close()
- }
+ verify(dataSource).close()
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
index fb0beaa..65cb17a 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
@@ -24,9 +24,10 @@ import java.util.Random
import org.mockito.Mockito._
import org.scalatest.mock.MockitoSugar
import org.scalatest.{FlatSpec, Matchers}
-import org.apache.gearpump.{MAX_TIME_MILLIS, Message}
+import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner}
+import org.apache.gearpump.streaming.source.Watermark
import org.apache.gearpump.streaming.task.SubscriptionSpec.NextTask
import org.apache.gearpump.streaming.{LifeTime, ProcessorDescription}
@@ -115,7 +116,7 @@ class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar {
subscription.sendMessage(Message(randomMessage, clock))
}
- assert(subscription.allowSendingMoreMessages() == false)
+ assert(!subscription.allowSendingMoreMessages())
}
it should "report minClock as Long.MaxValue when there is no pending message" in {
@@ -124,7 +125,7 @@ class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar {
subscription.sendMessage(msg1)
assert(subscription.minClock == 70)
subscription.receiveAck(Ack(TaskId(1, 1), 1, 1, session))
- assert(subscription.minClock == MAX_TIME_MILLIS)
+ assert(subscription.minClock == Watermark.MAX.toEpochMilli)
}
private def randomMessage: String = new Random().nextInt.toString
[2/2] incubator-gearpump git commit: [GEARPUMP-316] Decouple groupBy
from window
Posted by ma...@apache.org.
[GEARPUMP-316] Decouple groupBy from window
This includes major changes to `Stream DSL` as following
1. Decouple groupBy from window. Previously, `window` is required before `groupBy` now users can write `window.groupBy` or `groupBy.window`, even `window.groupBy.window`.
2. Correspondingly, `GroupByOp` is split into `GroupByOp` and `WindowOp`. When chaining `Op`s, there will be intermediate `Op`s such as `WindowTransformOp`, `TransformWindowTransformOp` generated before they can be translated into a function (wrapping UDF).
3. The function will be run by a `WindowRunner` in either `DataSourceTask`, `GroupByTask` or `TransformTask`. Messages are windowed, processed and triggered as defined by window function and triggers.
4. A Stream will implicitly in a global window. Its window only changes when user explicitly defines another window. Hence, `groupBy` equals to `globalWindows.groupBy.globalWindows`, `groupBy.fixedWindows` equals to `globalWindows.groupBy.fixedWindows`.
5. Bug fixes for output time and `Watermark.MAX`
Author: manuzhang <ow...@gmail.com>
Closes #186 from manuzhang/window.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/24e1a454
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/24e1a454
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/24e1a454
Branch: refs/heads/master
Commit: 24e1a4546260ca414ca84f8df055e57f4d963263
Parents: c1370d9
Author: manuzhang <ow...@gmail.com>
Authored: Wed Jun 14 10:05:32 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Wed Jun 14 10:05:32 2017 +0800
----------------------------------------------------------------------
.../materializer/RemoteMaterializerImpl.scala | 22 +-
.../apache/gearpump/streaming/Constants.scala | 1 -
.../streaming/dsl/javaapi/JavaStream.scala | 17 +-
.../dsl/partitioner/GroupByPartitioner.scala | 48 ----
.../apache/gearpump/streaming/dsl/plan/OP.scala | 243 ++++++++++++++++---
.../gearpump/streaming/dsl/plan/Planner.scala | 10 +-
.../dsl/plan/functions/FunctionRunner.scala | 16 +-
.../streaming/dsl/scalaapi/Stream.scala | 70 +++---
.../streaming/dsl/scalaapi/StreamApp.scala | 2 +-
.../streaming/dsl/task/CountTriggerTask.scala | 62 -----
.../dsl/task/EventTimeTriggerTask.scala | 58 -----
.../streaming/dsl/task/GroupByTask.scala | 73 ++++++
.../dsl/task/ProcessingTimeTriggerTask.scala | 81 -------
.../streaming/dsl/task/TransformTask.scala | 56 +----
.../streaming/dsl/window/api/Trigger.scala | 4 -
.../dsl/window/api/WindowFunction.scala | 37 ++-
.../streaming/dsl/window/api/Windows.scala | 43 ++--
.../streaming/dsl/window/impl/Window.scala | 28 ---
.../dsl/window/impl/WindowRunner.scala | 155 ++++++------
.../partitioner/GroupByPartitioner.scala | 47 ++++
.../streaming/source/DataSourceTask.scala | 24 +-
.../gearpump/streaming/source/Watermark.scala | 2 +-
.../gearpump/streaming/task/Subscription.scala | 4 +-
.../partitioner/GroupByPartitionerSpec.scala | 45 ----
.../gearpump/streaming/dsl/plan/OpSpec.scala | 94 +++----
.../streaming/dsl/plan/PlannerSpec.scala | 25 +-
.../dsl/plan/functions/FunctionRunnerSpec.scala | 98 ++------
.../streaming/dsl/scalaapi/StreamAppSpec.scala | 4 +-
.../streaming/dsl/scalaapi/StreamSpec.scala | 7 +-
.../dsl/task/CountTriggerTaskSpec.scala | 61 -----
.../dsl/task/EventTimeTriggerTaskSpec.scala | 66 -----
.../streaming/dsl/task/GroupByTaskSpec.scala | 60 +++++
.../task/ProcessingTimeTriggerTaskSpec.scala | 69 ------
.../streaming/dsl/task/TransformTaskSpec.scala | 62 ++---
.../window/impl/DefaultWindowRunnerSpec.scala | 38 +--
.../partitioner/GroupByPartitionerSpec.scala | 45 ++++
.../streaming/source/DataSourceTaskSpec.scala | 51 ++--
.../streaming/task/SubscriptionSpec.scala | 7 +-
38 files changed, 761 insertions(+), 1074 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
index 74fe077..e2cdbd4 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
@@ -35,11 +35,9 @@ import org.apache.gearpump.akkastream.task.{BalanceTask, BatchTask, BroadcastTas
import org.apache.gearpump.akkastream.task.TickSourceTask.{INITIAL_DELAY, INTERVAL, TICK}
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.dsl.plan.functions.FlatMapper
-import org.apache.gearpump.streaming.dsl.plan.{ChainableOp, DataSinkOp, DataSourceOp, Direct, GroupByOp, MergeOp, Op, OpEdge, ProcessorOp, Shuffle}
+import org.apache.gearpump.streaming.dsl.plan.{TransformOp, DataSinkOp, DataSourceOp, Direct, GroupByOp, MergeOp, Op, OpEdge, ProcessorOp, Shuffle}
import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp
import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
-import org.apache.gearpump.streaming.dsl.window.api.CountWindows
-import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
import org.apache.gearpump.streaming.{ProcessorId, StreamApplication}
import org.apache.gearpump.util.Graph
import org.slf4j.LoggerFactory
@@ -152,10 +150,10 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) {
val op = module match {
case source: SourceTaskModule[_] =>
val updatedConf = conf.withConfig(source.conf)
- DataSourceOp(source.source, parallelism, updatedConf, "source")
+ DataSourceOp(source.source, parallelism, "source", updatedConf)
case sink: SinkTaskModule[_] =>
val updatedConf = conf.withConfig(sink.conf)
- DataSinkOp(sink.sink, parallelism, updatedConf, "sink")
+ DataSinkOp(sink.sink, parallelism, "sink", updatedConf)
case sourceBridge: SourceBridgeModule[_, _] =>
ProcessorOp(classOf[SourceBridgeTask], parallelism = 1, conf, "source")
case processor: ProcessorModule[_, _, _] =>
@@ -164,8 +162,7 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) {
case sinkBridge: SinkBridgeModule[_, _] =>
ProcessorOp(classOf[SinkBridgeTask], parallelism, conf, "sink")
case groupBy: GroupByModule[Any, Any] =>
- GroupByOp(GroupAlsoByWindow(groupBy.groupBy, CountWindows.apply[Any](1).accumulating),
- parallelism, "groupBy", conf)
+ GroupByOp(groupBy.groupBy, parallelism, "groupBy", conf)
case reduce: ReduceModule[_] =>
reduceOp(reduce.f, conf)
case graphStage: GraphStageModule =>
@@ -181,7 +178,7 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) {
op
}.mapEdge[OpEdge] { (n1, edge, n2) =>
n2 match {
- case chainableOp: ChainableOp[_, _]
+ case chainableOp: TransformOp[_, _]
if !n1.isInstanceOf[ProcessorOp[_]] && !n2.isInstanceOf[ProcessorOp[_]] =>
Direct
case _ =>
@@ -242,8 +239,7 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) {
withValue(FoldTask.AGGREGATOR, fold.f)
ProcessorOp(classOf[FoldTask[_, _]], parallelism, foldConf, "fold")
case groupBy: GroupBy[Any, Any] =>
- GroupByOp(GroupAlsoByWindow(groupBy.keyFor, CountWindows.apply[Any](1).accumulating),
- groupBy.maxSubstreams, "groupBy", conf)
+ GroupByOp(groupBy.keyFor, groupBy.maxSubstreams, "groupBy", conf)
case groupedWithin: GroupedWithin[_] =>
val diConf = conf.withValue[FiniteDuration](GroupedWithinTask.TIME_WINDOW, groupedWithin.d).
withInt(GroupedWithinTask.BATCH_SIZE, groupedWithin.n)
@@ -285,9 +281,9 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) {
withInt(MergeTask.INPUT_PORTS, merge.inputPorts)
ProcessorOp(classOf[MergeTask], parallelism, mergeConf, "merge")
case mergePreferred: MergePreferred[_] =>
- MergeOp("mergePreferred", conf)
+ MergeOp()
case mergeSorted: MergeSorted[_] =>
- MergeOp("mergeSorted", conf)
+ MergeOp()
case prefixAndTail: PrefixAndTail[_] =>
// TODO
null
@@ -480,7 +476,7 @@ object RemoteMaterializerImpl {
def flatMapOp[In, Out](fun: In => TraversableOnce[Out], description: String,
conf: UserConfig): Op = {
- ChainableOp(new FlatMapper(FlatMapFunction[In, Out](fun), description), conf)
+ TransformOp(new FlatMapper(FlatMapFunction[In, Out](fun), description), conf)
}
def conflateOp[In, Out](seed: In => Out, aggregate: (Out, In) => Out,
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
index 7ac1b74..d9cfb92 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
@@ -24,7 +24,6 @@ object Constants {
val GEARPUMP_STREAMING_OPERATOR = "gearpump.streaming.dsl.operator"
val GEARPUMP_STREAMING_SOURCE = "gearpump.streaming.source"
val GEARPUMP_STREAMING_GROUPBY_FUNCTION = "gearpump.streaming.dsl.groupby-function"
- val GEARPUMP_STREAMING_WINDOW_FUNCTION = "gearpump.streaming.dsl.window-function"
val GEARPUMP_STREAMING_LOCALITIES = "gearpump.streaming.localities"
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
index da0e4db..cb9f084 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
@@ -21,7 +21,7 @@ import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, FoldFunction, MapFunction, ReduceFunction}
import org.apache.gearpump.streaming.dsl.javaapi.functions.{GroupByFunction, FlatMapFunction => JFlatMapFunction}
import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
-import org.apache.gearpump.streaming.dsl.scalaapi.{Stream, WindowStream}
+import org.apache.gearpump.streaming.dsl.scalaapi.Stream
import org.apache.gearpump.streaming.dsl.window.api.Windows
import org.apache.gearpump.streaming.task.Task
@@ -59,8 +59,8 @@ class JavaStream[T](val stream: Stream[T]) {
}
/** Merges streams of same type together */
- def merge(other: JavaStream[T], description: String): JavaStream[T] = {
- new JavaStream[T](stream.merge(other.stream, description))
+ def merge(other: JavaStream[T], parallelism: Int, description: String): JavaStream[T] = {
+ new JavaStream[T](stream.merge(other.stream, parallelism, description))
}
/**
@@ -72,8 +72,8 @@ class JavaStream[T](val stream: Stream[T]) {
new JavaStream[T](stream.groupBy(fn.groupBy, parallelism, description))
}
- def window(win: Windows[T], description: String): JavaWindowStream[T] = {
- new JavaWindowStream[T](stream.window(win, description))
+ def window(win: Windows): JavaStream[T] = {
+ new JavaStream[T](stream.window(win))
}
/** Add a low level Processor to process messages */
@@ -84,10 +84,3 @@ class JavaStream[T](val stream: Stream[T]) {
}
}
-class JavaWindowStream[T](stream: WindowStream[T]) {
-
- def groupBy[GROUP](fn: GroupByFunction[T, GROUP], parallelism: Int,
- description: String): JavaStream[T] = {
- new JavaStream[T](stream.groupBy(fn.groupBy, parallelism, description))
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
deleted file mode 100644
index 3789d4e..0000000
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.dsl.partitioner
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.streaming.partitioner.UnicastPartitioner
-
-/**
- * Partition messages by applying group by function first.
- *
- * For example:
- * {{{
- * case class People(name: String, gender: String)
- *
- * object Test{
- *
- * val groupBy: (People => String) = people => people.gender
- * val partitioner = GroupByPartitioner(groupBy)
- * }
- * }}}
- *
- * @param fn First apply message with groupBy function, then pick the hashCode of the output
- * to do the partitioning. You must define hashCode() for output type of groupBy function.
- */
-class GroupByPartitioner[T, GROUP](fn: T => GROUP) extends UnicastPartitioner {
-
- override def getPartition(message: Message, partitionNum: Int, currentPartitionId: Int): Int = {
- val hashCode = fn(message.value.asInstanceOf[T]).hashCode()
- (hashCode & Integer.MAX_VALUE) % partitionNum
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
index 708e0d2..2a45a8f 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
@@ -20,18 +20,44 @@ package org.apache.gearpump.streaming.dsl.plan
import akka.actor.ActorSystem
import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.Constants._
import org.apache.gearpump.streaming.Processor.DefaultProcessor
-import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, FunctionRunner}
+import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, DummyRunner, FunctionRunner}
+import org.apache.gearpump.streaming.dsl.window.impl.{AndThen => WindowRunnerAT}
import org.apache.gearpump.streaming.{Constants, Processor}
-import org.apache.gearpump.streaming.dsl.task.TransformTask
-import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
+import org.apache.gearpump.streaming.dsl.task.{GroupByTask, TransformTask}
+import org.apache.gearpump.streaming.dsl.window.api.{GlobalWindows, Windows}
+import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, WindowRunner}
import org.apache.gearpump.streaming.sink.{DataSink, DataSinkProcessor}
import org.apache.gearpump.streaming.source.{DataSource, DataSourceTask}
import org.apache.gearpump.streaming.task.Task
import scala.reflect.ClassTag
+object Op {
+
+ def concatenate(desc1: String, desc2: String): String = {
+ if (desc1 == null || desc1.isEmpty) desc2
+ else if (desc2 == null || desc2.isEmpty) desc1
+ else desc1 + "." + desc2
+ }
+
+ def concatenate(config1: UserConfig, config2: UserConfig): UserConfig = {
+ config1.withConfig(config2)
+ }
+
+ def withGlobalWindowsDummyRunner(op: Op, userConfig: UserConfig,
+ processor: Processor[_ <: Task])(implicit system: ActorSystem): Processor[_ <: Task] = {
+ if (userConfig.getValue(Constants.GEARPUMP_STREAMING_OPERATOR).isEmpty) {
+ op.chain(
+ WindowOp(GlobalWindows()).chain(TransformOp(new DummyRunner[Any]))
+ ).toProcessor
+ } else {
+ processor
+ }
+ }
+
+}
+
/**
* This is a vertex on the logical plan.
*/
@@ -43,7 +69,7 @@ sealed trait Op {
def chain(op: Op)(implicit system: ActorSystem): Op
- def getProcessor(implicit system: ActorSystem): Processor[_ <: Task]
+ def toProcessor(implicit system: ActorSystem): Processor[_ <: Task]
}
/**
@@ -67,7 +93,7 @@ case class ProcessorOp[T <: Task](
throw new OpChainException(this, other)
}
- override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+ override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
DefaultProcessor(parallelism, description, userConfig, processor)
}
}
@@ -78,24 +104,40 @@ case class ProcessorOp[T <: Task](
case class DataSourceOp(
dataSource: DataSource,
parallelism: Int = 1,
- userConfig: UserConfig = UserConfig.empty,
- description: String = "source")
+ description: String = "source",
+ userConfig: UserConfig = UserConfig.empty)
extends Op {
override def chain(other: Op)(implicit system: ActorSystem): Op = {
other match {
- case op: ChainableOp[_, _] =>
- DataSourceOp(dataSource, parallelism,
- userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.fn),
- description)
+ case op: WindowTransformOp[_, _] =>
+ DataSourceOp(
+ dataSource,
+ parallelism,
+ Op.concatenate(description, op.description),
+ Op.concatenate(userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR,
+ op.windowRunner),
+ op.userConfig))
+ case op: TransformOp[_, _] =>
+ chain(
+ WindowOp(GlobalWindows()).chain(op))
+ case op: WindowOp =>
+ chain(
+ op.chain(TransformOp(new DummyRunner[Any]())))
+ case op: TransformWindowTransformOp[_, _, _] =>
+ chain(
+ WindowOp(GlobalWindows()).chain(op.transformOp)
+ .chain(op.windowTransformOp))
case _ =>
throw new OpChainException(this, other)
}
}
- override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
- Processor[DataSourceTask[Any, Any]](parallelism, description,
- userConfig.withValue(GEARPUMP_STREAMING_SOURCE, dataSource))
+ override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+ Op.withGlobalWindowsDummyRunner(this, userConfig,
+ Processor[DataSourceTask[Any, Any]](parallelism, description,
+ userConfig.withValue(Constants.GEARPUMP_STREAMING_SOURCE, dataSource))
+ )
}
}
@@ -105,15 +147,15 @@ case class DataSourceOp(
case class DataSinkOp(
dataSink: DataSink,
parallelism: Int = 1,
- userConfig: UserConfig = UserConfig.empty,
- description: String = "sink")
+ description: String = "sink",
+ userConfig: UserConfig = UserConfig.empty)
extends Op {
override def chain(op: Op)(implicit system: ActorSystem): Op = {
throw new OpChainException(this, op)
}
- override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+ override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
DataSinkProcessor(dataSink, parallelism, description)
}
}
@@ -123,7 +165,7 @@ case class DataSinkOp(
* (e.g. flatMap, map, filter, reduce) and further chained
* to another Op to be used
*/
-case class ChainableOp[IN, OUT](
+case class TransformOp[IN, OUT](
fn: FunctionRunner[IN, OUT],
userConfig: UserConfig = UserConfig.empty) extends Op {
@@ -131,25 +173,127 @@ case class ChainableOp[IN, OUT](
override def chain(other: Op)(implicit system: ActorSystem): Op = {
other match {
- case op: ChainableOp[OUT, _] =>
+ case op: TransformOp[OUT, _] =>
// TODO: preserve type info
- ChainableOp(AndThen(fn, op.fn))
+ // f3(f2(f1(in)))
+ // => ChainableOp(f1).chain(ChainableOp(f2)).chain(ChainableOp(f3))
+ // => AndThen(AndThen(f1, f2), f3)
+ TransformOp(
+ AndThen(fn, op.fn),
+ Op.concatenate(userConfig, op.userConfig))
+ case op: WindowOp =>
+ TransformWindowTransformOp(this,
+ WindowTransformOp(new DefaultWindowRunner[OUT, OUT](
+ op.windows, new DummyRunner[OUT]
+ ), op.description, op.userConfig))
+ case op: TransformWindowTransformOp[OUT, _, _] =>
+ TransformWindowTransformOp(TransformOp(
+ AndThen(fn, op.transformOp.fn),
+ Op.concatenate(userConfig, op.transformOp.userConfig)
+ ), op.windowTransformOp)
case _ =>
throw new OpChainException(this, other)
}
}
- override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
- Processor[TransformTask[Any, Any]](1, description,
- userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, fn))
+ override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+ WindowOp(GlobalWindows()).chain(this).toProcessor
}
}
/**
- * This represents a Processor with window aggregation
+ * This is an intermediate operation, produced by chaining WindowOp and TransformOp.
+ * Usually, it will be chained to a DataSourceOp, GroupByOp or MergeOp.
+ * Otherwise, it will be translated to a Processor of TransformTask.
*/
-case class GroupByOp[IN, GROUP](
- groupBy: GroupAlsoByWindow[IN, GROUP],
+case class WindowTransformOp[IN, OUT](
+ windowRunner: WindowRunner[IN, OUT],
+ description: String,
+ userConfig: UserConfig) extends Op {
+
+ override def chain(other: Op)(implicit system: ActorSystem): Op = {
+ other match {
+ case op: WindowTransformOp[OUT, _] =>
+ WindowTransformOp(
+ WindowRunnerAT(windowRunner, op.windowRunner),
+ Op.concatenate(description, op.description),
+ Op.concatenate(userConfig, op.userConfig)
+ )
+ case _ =>
+ throw new OpChainException(this, other)
+ }
+ }
+
+ override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+ // TODO: this should be chained to DataSourceOp / GroupByOp / MergeOp
+ Processor[TransformTask[Any, Any]](1, description, userConfig.withValue(
+ Constants.GEARPUMP_STREAMING_OPERATOR, windowRunner))
+ }
+}
+
+/**
+ * This is an intermediate operation, produced by chaining TransformOp and WindowOp.
+ * It will later be chained to a WindowOp, which results in two WindowTransformOps.
+ * Finally, they will be chained to a single WindowTransformOp.
+ */
+case class TransformWindowTransformOp[IN, MIDDLE, OUT](
+ transformOp: TransformOp[IN, MIDDLE],
+ windowTransformOp: WindowTransformOp[MIDDLE, OUT]) extends Op {
+
+ override def description: String = {
+ throw new UnsupportedOperationException(s"description is not supported on $this")
+ }
+
+ override def userConfig: UserConfig = {
+ throw new UnsupportedOperationException(s"userConfig is not supported on $this")
+ }
+
+ override def chain(op: Op)(implicit system: ActorSystem): Op = {
+ throw new UnsupportedOperationException(s"chain is not supported on $this")
+ }
+
+ override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+ WindowOp(GlobalWindows()).chain(this).toProcessor
+ }
+}
+
+/**
+ * This represents a window aggregation, together with a following TransformOp
+ */
+case class WindowOp(
+ windows: Windows,
+ userConfig: UserConfig = UserConfig.empty) extends Op {
+
+ override def description: String = windows.description
+
+ override def chain(other: Op)(implicit system: ActorSystem): Op = {
+ other match {
+ case op: TransformOp[_, _] =>
+ WindowTransformOp(new DefaultWindowRunner(windows, op.fn),
+ Op.concatenate(description, op.description),
+ Op.concatenate(userConfig, op.userConfig))
+ case op: WindowOp =>
+ chain(TransformOp(new DummyRunner[Any])).chain(op.chain(TransformOp(new DummyRunner[Any])))
+ case op: TransformWindowTransformOp[_, _, _] =>
+ WindowTransformOp(new DefaultWindowRunner(windows, op.transformOp.fn),
+ Op.concatenate(description, op.transformOp.description),
+ Op.concatenate(userConfig, op.transformOp.userConfig)).chain(op.windowTransformOp)
+ case _ =>
+ throw new OpChainException(this, other)
+ }
+ }
+
+ override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+ chain(TransformOp(new DummyRunner[Any])).toProcessor
+ }
+
+}
+
+/**
+ * This represents a Processor with groupBy and window aggregation
+ */
+case class GroupByOp[IN, GROUP] private(
+ groupBy: IN => GROUP,
parallelism: Int = 1,
description: String = "groupBy",
override val userConfig: UserConfig = UserConfig.empty)
@@ -157,36 +301,57 @@ case class GroupByOp[IN, GROUP](
override def chain(other: Op)(implicit system: ActorSystem): Op = {
other match {
- case op: ChainableOp[_, _] =>
- GroupByOp(groupBy, parallelism, description,
- userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.fn))
+ case op: WindowTransformOp[_, _] =>
+ GroupByOp(
+ groupBy,
+ parallelism,
+ Op.concatenate(description, op.description),
+ Op.concatenate(
+ userConfig
+ .withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.windowRunner),
+ userConfig))
+ case op: WindowOp =>
+ chain(op.chain(TransformOp(new DummyRunner[Any]())))
case _ =>
throw new OpChainException(this, other)
}
}
- override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
- groupBy.getProcessor(parallelism, description, userConfig)
+ override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+ Op.withGlobalWindowsDummyRunner(this, userConfig,
+ Processor[GroupByTask[IN, GROUP, Any]](parallelism, description,
+ userConfig.withValue(Constants.GEARPUMP_STREAMING_GROUPBY_FUNCTION, groupBy)))
}
}
/**
* This represents a Processor transforming merged streams
*/
-case class MergeOp(description: String, userConfig: UserConfig = UserConfig.empty)
+case class MergeOp(
+ parallelism: Int = 1,
+ description: String = "merge",
+ userConfig: UserConfig = UserConfig.empty)
extends Op {
override def chain(other: Op)(implicit system: ActorSystem): Op = {
other match {
- case op: ChainableOp[_, _] =>
- MergeOp(description, userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.fn))
+ case op: WindowTransformOp[_, _] =>
+ MergeOp(
+ parallelism,
+ description,
+ Op.concatenate(userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR,
+ op.windowRunner),
+ op.userConfig))
+ case op: WindowOp =>
+ chain(op.chain(TransformOp(new DummyRunner[Any]())))
case _ =>
throw new OpChainException(this, other)
}
}
- override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
- Processor[TransformTask[Any, Any]](1, description, userConfig)
+ override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+ Op.withGlobalWindowsDummyRunner(this, userConfig,
+ Processor[TransformTask[Any, Any]](parallelism, description, userConfig))
}
}
@@ -198,7 +363,7 @@ trait OpEdge
/**
* The upstream OP and downstream OP doesn't require network data shuffle.
- * e.g. ChainableOp
+ * e.g. TransformOp
*/
case object Direct extends OpEdge
@@ -211,4 +376,4 @@ case object Shuffle extends OpEdge
/**
* Runtime exception thrown on chaining.
*/
-class OpChainException(op1: Op, op2: Op) extends RuntimeException(s"$op1 cannot be chained by $op2")
\ No newline at end of file
+class OpChainException(op1: Op, op2: Op) extends RuntimeException(s"$op1 can't be chained by $op2")
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
index 1dd8026..b1b39c9 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
@@ -19,10 +19,8 @@
package org.apache.gearpump.streaming.dsl.plan
import akka.actor.ActorSystem
-
-import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, HashPartitioner, Partitioner}
+import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, GroupByPartitioner, HashPartitioner, Partitioner}
import org.apache.gearpump.streaming.Processor
-import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
import org.apache.gearpump.streaming.task.Task
import org.apache.gearpump.util.Graph
@@ -36,18 +34,18 @@ class Planner {
(implicit system: ActorSystem): Graph[Processor[_ <: Task], _ <: Partitioner] = {
val graph = optimize(dag)
- graph.mapEdge { (node1, edge, node2) =>
+ graph.mapEdge { (_, edge, node2) =>
edge match {
case Shuffle =>
node2 match {
case op: GroupByOp[_, _] =>
- new GroupByPartitioner(op.groupBy.groupByFn)
+ new GroupByPartitioner(op.groupBy)
case _ => new HashPartitioner
}
case Direct =>
new CoLocationPartitioner
}
- }.mapVertex(_.getProcessor)
+ }.mapVertex(_.toProcessor)
}
private def optimize(dag: Graph[Op, OpEdge])
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
index c27300f..2c11238 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
@@ -17,16 +17,9 @@
*/
package org.apache.gearpump.streaming.dsl.plan.functions
-import org.apache.gearpump.streaming.dsl.api.functions.{FoldFunction, ReduceFunction}
+import org.apache.gearpump.streaming.dsl.api.functions.FoldFunction
import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
-object FunctionRunner {
- def withEmitFn[IN, OUT](runner: FunctionRunner[IN, OUT],
- fn: OUT => Unit): FunctionRunner[IN, Unit] = {
- AndThen(runner, new Emit(fn))
- }
-}
-
/**
* Interface to invoke SerializableFunction methods
*
@@ -121,12 +114,9 @@ class FoldRunner[T, A](fn: FoldFunction[T, A], val description: String)
}
}
-class Emit[T](emit: T => Unit) extends FunctionRunner[T, Unit] {
+class DummyRunner[T] extends FunctionRunner[T, T] {
- override def process(value: T): TraversableOnce[Unit] = {
- emit(value)
- None
- }
+ override def process(value: T): TraversableOnce[T] = Option(value)
override def description: String = ""
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
index e15d4ae..ef2753e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
@@ -25,7 +25,6 @@ import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
import org.apache.gearpump.streaming.dsl.plan._
import org.apache.gearpump.streaming.dsl.plan.functions._
import org.apache.gearpump.streaming.dsl.window.api._
-import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
import org.apache.gearpump.streaming.sink.DataSink
import org.apache.gearpump.streaming.task.{Task, TaskContext}
import org.apache.gearpump.util.Graph
@@ -35,7 +34,8 @@ import scala.language.implicitConversions
class Stream[T](
private val graph: Graph[Op, OpEdge], private val thisNode: Op,
- private val edge: Option[OpEdge] = None) {
+ private val edge: Option[OpEdge] = None,
+ private val windows: Windows = GlobalWindows()) {
/**
* Returns a new stream by applying a flatMap function to each element
@@ -108,11 +108,7 @@ class Stream[T](
* @return a new stream after fold
*/
def fold[A](fn: FoldFunction[T, A], description: String): Stream[A] = {
- if (graph.vertices.exists(_.isInstanceOf[GroupByOp[_, _]])) {
- transform(new FoldRunner(fn, description))
- } else {
- throw new UnsupportedOperationException("fold operation can only be applied on window")
- }
+ transform(new FoldRunner(fn, description))
}
/**
@@ -138,10 +134,10 @@ class Stream[T](
}
private def transform[R](fn: FunctionRunner[T, R]): Stream[R] = {
- val op = ChainableOp(fn)
+ val op = TransformOp(fn)
graph.addVertex(op)
graph.addEdge(thisNode, edge.getOrElse(Direct), op)
- new Stream(graph, op)
+ new Stream(graph, op, None, windows)
}
/**
@@ -160,12 +156,13 @@ class Stream[T](
* @param other the other stream
* @return the merged stream
*/
- def merge(other: Stream[T], description: String = "merge"): Stream[T] = {
- val mergeOp = MergeOp(description, UserConfig.empty)
+ def merge(other: Stream[T], parallelism: Int = 1, description: String = "merge"): Stream[T] = {
+ val mergeOp = MergeOp(parallelism, description, UserConfig.empty)
graph.addVertex(mergeOp)
graph.addEdge(thisNode, edge.getOrElse(Direct), mergeOp)
graph.addEdge(other.thisNode, other.edge.getOrElse(Shuffle), mergeOp)
- new Stream[T](graph, mergeOp)
+ val winOp = Stream.addWindowOp(graph, mergeOp, windows)
+ new Stream[T](graph, winOp, None, windows)
}
/**
@@ -184,23 +181,26 @@ class Stream[T](
* @param fn Group by function
* @param parallelism Parallelism level
* @param description The description
- * @return the grouped stream
+ * @return the grouped stream
*/
def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1,
description: String = "groupBy"): Stream[T] = {
- window(GlobalWindows())
- .groupBy[GROUP](fn, parallelism, description)
+ val gbOp = GroupByOp(fn, parallelism, description)
+ graph.addVertex(gbOp)
+ graph.addEdge(thisNode, edge.getOrElse(Shuffle), gbOp)
+ val winOp = Stream.addWindowOp(graph, gbOp, windows)
+ new Stream(graph, winOp, None, windows)
}
/**
* Window function
*
- * @param win window definition
- * @param description window description
- * @return [[WindowStream]] where groupBy could be applied
+ * @param windows window definition
+ * @return the windowed [[Stream]]
*/
- def window(win: Windows[T], description: String = "window"): WindowStream[T] = {
- new WindowStream[T](graph, edge, thisNode, win, description)
+ def window(windows: Windows): Stream[T] = {
+ val winOp = Stream.addWindowOp(graph, thisNode, windows)
+ new Stream(graph, winOp, None, windows)
}
/**
@@ -216,22 +216,10 @@ class Stream[T](
val processorOp = ProcessorOp(processor, parallelism, conf, description)
graph.addVertex(processorOp)
graph.addEdge(thisNode, edge.getOrElse(Shuffle), processorOp)
- new Stream[R](graph, processorOp, Some(Shuffle))
+ new Stream[R](graph, processorOp, Some(Shuffle), windows)
}
-}
-class WindowStream[T](graph: Graph[Op, OpEdge], edge: Option[OpEdge], thisNode: Op,
- window: Windows[T], winDesc: String) {
- def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1,
- description: String = "groupBy"): Stream[T] = {
- val groupBy = GroupAlsoByWindow(fn, window)
- val groupOp = GroupByOp[T, GROUP](groupBy, parallelism,
- s"$winDesc.$description")
- graph.addVertex(groupOp)
- graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp)
- new Stream[T](graph, groupOp)
- }
}
class KVStream[K, V](stream: Stream[Tuple2[K, V]]) {
@@ -263,8 +251,9 @@ class KVStream[K, V](stream: Stream[Tuple2[K, V]]) {
object Stream {
- def apply[T](graph: Graph[Op, OpEdge], node: Op, edge: Option[OpEdge]): Stream[T] = {
- new Stream[T](graph, node, edge)
+ def apply[T](graph: Graph[Op, OpEdge], node: Op, edge: Option[OpEdge],
+ windows: Windows): Stream[T] = {
+ new Stream[T](graph, node, edge, windows)
}
def getTupleKey[K, V](tuple: Tuple2[K, V]): K = tuple._1
@@ -272,6 +261,13 @@ object Stream {
def sumByKey[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, V]) => Tuple2[K, V]
= (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2))
+ def addWindowOp(graph: Graph[Op, OpEdge], op: Op, win: Windows): Op = {
+ val winOp = WindowOp(win)
+ graph.addVertex(winOp)
+ graph.addEdge(op, Direct, winOp)
+ winOp
+ }
+
implicit def streamToKVStream[K, V](stream: Stream[Tuple2[K, V]]): KVStream[K, V] = {
new KVStream(stream)
}
@@ -279,10 +275,10 @@ object Stream {
implicit class Sink[T](stream: Stream[T]) extends java.io.Serializable {
def sink(dataSink: DataSink, parallelism: Int = 1,
conf: UserConfig = UserConfig.empty, description: String = "sink"): Stream[T] = {
- implicit val sink = DataSinkOp(dataSink, parallelism, conf, description)
+ implicit val sink = DataSinkOp(dataSink, parallelism, description, conf)
stream.graph.addVertex(sink)
stream.graph.addEdge(stream.thisNode, Shuffle, sink)
- new Stream[T](stream.graph, sink)
+ new Stream[T](stream.graph, sink, None, stream.windows)
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
index 6378a18..bce8c0c 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
@@ -78,7 +78,7 @@ object StreamApp {
def source[T](dataSource: DataSource, parallelism: Int = 1,
conf: UserConfig = UserConfig.empty, description: String = "source"): Stream[T] = {
- implicit val sourceOp = DataSourceOp(dataSource, parallelism, conf, description)
+ implicit val sourceOp = DataSourceOp(dataSource, parallelism, description, conf)
app.graph.addVertex(sourceOp)
new Stream[T](app.graph, sourceOp)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
deleted file mode 100644
index 0dc28eb..0000000
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.dsl.task
-
-import java.time.Instant
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.dsl.window.api.CountWindowFunction
-import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, GroupAlsoByWindow, WindowRunner}
-import org.apache.gearpump.streaming.task.{Task, TaskContext}
-
-/**
- * This task triggers output on number of messages in a window.
- */
-class CountTriggerTask[IN, GROUP](
- groupBy: GroupAlsoByWindow[IN, GROUP],
- windowRunner: WindowRunner,
- taskContext: TaskContext,
- userConfig: UserConfig)
- extends Task(taskContext, userConfig) {
-
- def this(groupBy: GroupAlsoByWindow[IN, GROUP],
- taskContext: TaskContext, userConfig: UserConfig) = {
- this(groupBy, new DefaultWindowRunner(taskContext, userConfig, groupBy)(taskContext.system),
- taskContext, userConfig)
- }
-
- def this(taskContext: TaskContext, userConfig: UserConfig) = {
- this(userConfig.getValue[GroupAlsoByWindow[IN, GROUP]](
- GEARPUMP_STREAMING_GROUPBY_FUNCTION)(taskContext.system).get,
- taskContext, userConfig)
- }
-
- private val windowSize = groupBy.window.windowFn.asInstanceOf[CountWindowFunction[IN]].size
- private var num = 0
-
- override def onNext(msg: Message): Unit = {
- windowRunner.process(msg)
- num += 1
- if (windowSize == num) {
- windowRunner.trigger(Instant.ofEpochMilli(windowSize))
- num = 0
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala
deleted file mode 100644
index 0674339..0000000
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.dsl.task
-
-import java.time.Instant
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, GroupAlsoByWindow, WindowRunner}
-import org.apache.gearpump.streaming.task.{Task, TaskContext}
-
-/**
- * This task triggers output on watermark progress.
- */
-class EventTimeTriggerTask[IN, GROUP](
- groupBy: GroupAlsoByWindow[IN, GROUP],
- windowRunner: WindowRunner,
- taskContext: TaskContext,
- userConfig: UserConfig)
- extends Task(taskContext, userConfig) {
-
- def this(groupBy: GroupAlsoByWindow[IN, GROUP],
- taskContext: TaskContext, userConfig: UserConfig) = {
- this(groupBy, new DefaultWindowRunner(taskContext, userConfig, groupBy)(taskContext.system),
- taskContext, userConfig)
- }
-
- def this(taskContext: TaskContext, userConfig: UserConfig) = {
- this(userConfig.getValue[GroupAlsoByWindow[IN, GROUP]](
- GEARPUMP_STREAMING_GROUPBY_FUNCTION)(taskContext.system).get,
- taskContext, userConfig)
- }
-
- override def onNext(message: Message): Unit = {
- windowRunner.process(message)
- }
-
- override def onWatermarkProgress(watermark: Instant): Unit = {
- windowRunner.trigger(watermark)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala
new file mode 100644
index 0000000..8301fb9
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.task
+
+import java.time.Instant
+import java.util.function.Consumer
+
+import com.gs.collections.impl.map.mutable.UnifiedMap
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Constants.{GEARPUMP_STREAMING_GROUPBY_FUNCTION, GEARPUMP_STREAMING_OPERATOR}
+import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+
+/**
+ * Processes messages in groups as defined by groupBy function.
+ */
+class GroupByTask[IN, GROUP, OUT](
+ groupBy: IN => GROUP,
+ taskContext: TaskContext,
+ userConfig: UserConfig) extends Task(taskContext, userConfig) {
+
+ def this(context: TaskContext, conf: UserConfig) = {
+ this(
+ conf.getValue[IN => GROUP](GEARPUMP_STREAMING_GROUPBY_FUNCTION)(context.system).get,
+ context, conf
+ )
+ }
+
+ private val groups: UnifiedMap[GROUP, WindowRunner[IN, OUT]] =
+ new UnifiedMap[GROUP, WindowRunner[IN, OUT]]
+
+ override def onNext(message: Message): Unit = {
+ val input = message.value.asInstanceOf[IN]
+ val group = groupBy(input)
+
+ if (!groups.containsKey(group)) {
+ groups.put(group,
+ userConfig.getValue[WindowRunner[IN, OUT]](
+ GEARPUMP_STREAMING_OPERATOR)(taskContext.system).get)
+ }
+
+ groups.get(group).process(TimestampedValue(message.value.asInstanceOf[IN],
+ message.timestamp))
+ }
+
+ override def onWatermarkProgress(watermark: Instant): Unit = {
+ groups.values.forEach(new Consumer[WindowRunner[IN, OUT]] {
+ override def accept(runner: WindowRunner[IN, OUT]): Unit = {
+ runner.trigger(watermark).foreach {
+ result =>
+ taskContext.output(Message(result.value, result.timestamp))
+ }
+ }
+ })
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
deleted file mode 100644
index a04e3ca..0000000
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.dsl.task
-
-import java.time.Instant
-import java.util.concurrent.TimeUnit
-
-import akka.actor.Actor.Receive
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.dsl.task.ProcessingTimeTriggerTask.Triggering
-import org.apache.gearpump.streaming.dsl.window.api.SlidingWindowFunction
-import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, GroupAlsoByWindow, WindowRunner}
-import org.apache.gearpump.streaming.task.{Task, TaskContext}
-
-import scala.concurrent.duration.FiniteDuration
-
-object ProcessingTimeTriggerTask {
- case object Triggering
-}
-
-/**
- * This task triggers output on scheduled system time interval.
- */
-class ProcessingTimeTriggerTask[IN, GROUP](
- groupBy: GroupAlsoByWindow[IN, GROUP],
- windowRunner: WindowRunner,
- taskContext: TaskContext,
- userConfig: UserConfig)
- extends Task(taskContext, userConfig) {
-
- def this(groupBy: GroupAlsoByWindow[IN, GROUP],
- taskContext: TaskContext, userConfig: UserConfig) = {
- this(groupBy, new DefaultWindowRunner(taskContext, userConfig, groupBy)(taskContext.system),
- taskContext, userConfig)
- }
-
- def this(taskContext: TaskContext, userConfig: UserConfig) = {
- this(userConfig.getValue[GroupAlsoByWindow[IN, GROUP]](
- GEARPUMP_STREAMING_GROUPBY_FUNCTION)(taskContext.system).get,
- taskContext, userConfig)
- }
-
- private val windowFn = groupBy.window.windowFn.asInstanceOf[SlidingWindowFunction[IN]]
- private val windowSizeMs = windowFn.size.toMillis
- private val windowStepMs = windowFn.step.toMillis
-
- override def onStart(startTime: Instant): Unit = {
- val initialDelay = windowSizeMs - Instant.now.toEpochMilli % windowSizeMs
- taskContext.scheduleOnce(
- new FiniteDuration(initialDelay, TimeUnit.MILLISECONDS))(self ! Triggering)
- }
-
- override def onNext(message: Message): Unit = {
- windowRunner.process(message)
- }
-
- override def receiveUnManagedMessage: Receive = {
- case Triggering =>
- windowRunner.trigger(Instant.now)
- taskContext.scheduleOnce(
- new FiniteDuration(windowStepMs, TimeUnit.MILLISECONDS))(self ! Triggering)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
index 572df94..6a455a5 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
@@ -22,58 +22,28 @@ import java.time.Instant
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
-import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform
+import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner}
import org.apache.gearpump.streaming.task.{Task, TaskContext}
-object TransformTask {
-
- class Transform[IN, OUT](taskContext: TaskContext,
- processor: Option[FunctionRunner[IN, OUT]],
- private var buffer: Vector[Message] = Vector.empty[Message]) {
-
- def onNext(msg: Message): Unit = {
- buffer +:= msg
- }
-
- def onWatermarkProgress(watermark: Instant): Unit = {
- var nextBuffer = Vector.empty[Message]
- processor.foreach(_.setup())
- buffer.foreach { message: Message =>
- if (message.timestamp.isBefore(watermark)) {
- processor match {
- case Some(p) =>
- FunctionRunner
- .withEmitFn(p, (out: OUT) => taskContext.output(Message(out, message.timestamp)))
- // .toList forces eager evaluation
- .process(message.value.asInstanceOf[IN]).toList
- case None =>
- taskContext.output(message)
- }
- } else {
- nextBuffer +:= message
- }
- }
- processor.foreach(_.teardown())
- buffer = nextBuffer
- }
- }
-
-}
-
-class TransformTask[IN, OUT](transform: Transform[IN, OUT],
+class TransformTask[IN, OUT](
+ runner: WindowRunner[IN, OUT],
taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
- def this(taskContext: TaskContext, userConf: UserConfig) = {
- this(new Transform(taskContext, userConf.getValue[FunctionRunner[IN, OUT]](
- GEARPUMP_STREAMING_OPERATOR)(taskContext.system)), taskContext, userConf)
+ def this(context: TaskContext, conf: UserConfig) = {
+ this(
+ conf.getValue[WindowRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system).get,
+ context, conf
+ )
}
override def onNext(msg: Message): Unit = {
- transform.onNext(msg)
+ runner.process(TimestampedValue(msg.value.asInstanceOf[IN], msg.timestamp))
}
override def onWatermarkProgress(watermark: Instant): Unit = {
- transform.onWatermarkProgress(watermark)
+ runner.trigger(watermark).foreach {
+ result =>
+ taskContext.output(Message(result.value, result.timestamp))
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala
index 9865e18..02d52a0 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala
@@ -21,7 +21,3 @@ sealed trait Trigger
case object EventTimeTrigger extends Trigger
-case object ProcessingTimeTrigger extends Trigger
-
-case object CountTrigger extends Trigger
-
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
index 7da9c85..a2f51c7 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
@@ -32,35 +32,39 @@ object WindowFunction {
}
}
-trait WindowFunction[T] {
+trait WindowFunction {
- def apply(context: WindowFunction.Context[T]): Array[Window]
+ def apply[T](context: WindowFunction.Context[T]): Array[Window]
def isNonMerging: Boolean
}
-abstract class NonMergingWindowFunction[T] extends WindowFunction[T] {
+abstract class NonMergingWindowFunction extends WindowFunction {
override def isNonMerging: Boolean = true
}
-case class GlobalWindowFunction[T]() extends NonMergingWindowFunction[T] {
+object GlobalWindowFunction {
- override def apply(context: WindowFunction.Context[T]): Array[Window] = {
- Array(Window(Instant.ofEpochMilli(MIN_TIME_MILLIS),
- Instant.ofEpochMilli(MAX_TIME_MILLIS)))
- }
+ val globalWindow = Array(Window(Instant.ofEpochMilli(MIN_TIME_MILLIS),
+ Instant.ofEpochMilli(MAX_TIME_MILLIS)))
+}
+case class GlobalWindowFunction() extends NonMergingWindowFunction {
+
+ override def apply[T](context: WindowFunction.Context[T]): Array[Window] = {
+ GlobalWindowFunction.globalWindow
+ }
}
-case class SlidingWindowFunction[T](size: Duration, step: Duration)
- extends NonMergingWindowFunction[T] {
+case class SlidingWindowFunction(size: Duration, step: Duration)
+ extends NonMergingWindowFunction {
def this(size: Duration) = {
this(size, size)
}
- override def apply(context: WindowFunction.Context[T]): Array[Window] = {
+ override def apply[T](context: WindowFunction.Context[T]): Array[Window] = {
val timestamp = context.timestamp
val sizeMillis = size.toMillis
val stepMillis = step.toMillis
@@ -81,16 +85,9 @@ case class SlidingWindowFunction[T](size: Duration, step: Duration)
}
}
-case class CountWindowFunction[T](size: Int) extends NonMergingWindowFunction[T] {
-
- override def apply(context: WindowFunction.Context[T]): Array[Window] = {
- Array(Window.ofEpochMilli(0, size))
- }
-}
-
-case class SessionWindowFunction[T](gap: Duration) extends WindowFunction[T] {
+case class SessionWindowFunction(gap: Duration) extends WindowFunction {
- override def apply(context: WindowFunction.Context[T]): Array[Window] = {
+ override def apply[T](context: WindowFunction.Context[T]): Array[Window] = {
Array(Window(context.timestamp, context.timestamp.plus(gap)))
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
index 467f57c..d53bc96 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
@@ -20,42 +20,35 @@ package org.apache.gearpump.streaming.dsl.window.api
import java.time.Duration
/**
- *
* Defines how to apply window functions.
*
* @param windowFn how to divide windows
* @param trigger when to trigger window result
* @param accumulationMode whether to accumulate results across windows
*/
-case class Windows[T](
- windowFn: WindowFunction[T],
+case class Windows(
+ windowFn: WindowFunction,
trigger: Trigger = EventTimeTrigger,
- accumulationMode: AccumulationMode = Discarding) {
-
- def triggering(trigger: Trigger): Windows[T] = {
- Windows(windowFn, trigger)
- }
+ accumulationMode: AccumulationMode = Discarding,
+ description: String) {
- def accumulating: Windows[T] = {
- Windows(windowFn, trigger, Accumulating)
+ def triggering(trigger: Trigger): Windows = {
+ Windows(windowFn, trigger, accumulationMode, description)
}
- def discarding: Windows[T] = {
- Windows(windowFn, trigger, Discarding)
+ def accumulating: Windows = {
+ Windows(windowFn, trigger, Accumulating, description)
}
-}
-
-object CountWindows {
- def apply[T](size: Int): Windows[T] = {
- Windows(CountWindowFunction(size), CountTrigger)
+ def discarding: Windows = {
+ Windows(windowFn, trigger, Discarding, description)
}
}
object GlobalWindows {
- def apply[T](): Windows[T] = {
- Windows(GlobalWindowFunction())
+ def apply(): Windows = {
+ Windows(GlobalWindowFunction(), description = "globalWindows")
}
}
@@ -67,8 +60,8 @@ object FixedWindows {
* @param size window size
* @return a Window definition
*/
- def apply[T](size: Duration): Windows[T] = {
- Windows(SlidingWindowFunction(size, size))
+ def apply(size: Duration): Windows = {
+ Windows(SlidingWindowFunction(size, size), description = "fixedWindows")
}
}
@@ -81,8 +74,8 @@ object SlidingWindows {
* @param step window step to slide forward
* @return a Window definition
*/
- def apply[T](size: Duration, step: Duration): Windows[T] = {
- Windows(SlidingWindowFunction(size, step))
+ def apply(size: Duration, step: Duration): Windows = {
+ Windows(SlidingWindowFunction(size, step), description = "slidingWindows")
}
}
@@ -94,8 +87,8 @@ object SessionWindows {
* @param gap session gap
* @return a Window definition
*/
- def apply[T](gap: Duration): Windows[T] = {
- Windows(SessionWindowFunction(gap))
+ def apply(gap: Duration): Windows = {
+ Windows(SessionWindowFunction(gap), description = "sessionWindows")
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
index 05ce74e..870c334 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
@@ -25,7 +25,6 @@ import org.apache.gearpump.streaming.Constants._
import org.apache.gearpump.streaming.Processor
import org.apache.gearpump.{Message, TimeStamp}
import org.apache.gearpump.streaming.dsl.window.api._
-import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, EventTimeTriggerTask, ProcessingTimeTriggerTask}
import org.apache.gearpump.streaming.task.Task
object Window {
@@ -65,31 +64,4 @@ case class Window(startTime: Instant, endTime: Instant) extends Comparable[Windo
}
}
-case class GroupAlsoByWindow[T, GROUP](groupByFn: T => GROUP, window: Windows[T]) {
-
- def groupBy(message: Message): (GROUP, List[Window]) = {
- val ele = message.value.asInstanceOf[T]
- val group = groupByFn(ele)
- val windows = window.windowFn(new WindowFunction.Context[T] {
- override def element: T = ele
- override def timestamp: Instant = message.timestamp
- })
- group -> windows.toList
- }
-
- def getProcessor(parallelism: Int, description: String,
- userConfig: UserConfig)(implicit system: ActorSystem): Processor[_ <: Task] = {
- val config = userConfig.withValue(GEARPUMP_STREAMING_GROUPBY_FUNCTION, this)
- window.trigger match {
- case CountTrigger =>
- Processor[CountTriggerTask[T, GROUP]](parallelism, description, config)
- case ProcessingTimeTrigger =>
- Processor[ProcessingTimeTriggerTask[T, GROUP]](parallelism, description, config)
- case EventTimeTrigger =>
- Processor[EventTimeTriggerTask[T, GROUP]](parallelism, description, config)
- }
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
index f392f70..2025618 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
@@ -19,133 +19,124 @@ package org.apache.gearpump.streaming.dsl.window.impl
import java.time.Instant
-import akka.actor.ActorSystem
import com.gs.collections.api.block.predicate.Predicate
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import com.gs.collections.api.block.procedure.{Procedure, Procedure2}
+import com.gs.collections.api.block.procedure.Procedure
import com.gs.collections.impl.list.mutable.FastList
-import com.gs.collections.impl.map.mutable.UnifiedMap
import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap
-import org.apache.gearpump.streaming.Constants._
import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
-import org.apache.gearpump.streaming.dsl.window.api.Discarding
-import org.apache.gearpump.streaming.task.TaskContext
-import org.apache.gearpump.util.LogUtil
-import org.slf4j.Logger
+import org.apache.gearpump.streaming.dsl.window.api.WindowFunction.Context
+import org.apache.gearpump.streaming.dsl.window.api.{Discarding, Windows}
+import scala.collection.mutable.ArrayBuffer
-trait WindowRunner {
+case class TimestampedValue[T](value: T, timestamp: Instant)
- def process(message: Message): Unit
+trait WindowRunner[IN, OUT] extends java.io.Serializable {
- def trigger(time: Instant): Unit
+ def process(timestampedValue: TimestampedValue[IN]): Unit
+
+ def trigger(time: Instant): TraversableOnce[TimestampedValue[OUT]]
}
-object DefaultWindowRunner {
+case class AndThen[IN, MIDDLE, OUT](left: WindowRunner[IN, MIDDLE],
+ right: WindowRunner[MIDDLE, OUT]) extends WindowRunner[IN, OUT] {
+
+ def process(timestampedValue: TimestampedValue[IN]): Unit = {
+ left.process(timestampedValue)
+ }
- private val LOG: Logger = LogUtil.getLogger(classOf[DefaultWindowRunner[_, _, _]])
+ def trigger(time: Instant): TraversableOnce[TimestampedValue[OUT]] = {
+ left.trigger(time).foreach(right.process)
+ right.trigger(time)
+ }
}
-class DefaultWindowRunner[IN, GROUP, OUT](
- taskContext: TaskContext, userConfig: UserConfig,
- groupBy: GroupAlsoByWindow[IN, GROUP])(implicit system: ActorSystem)
- extends WindowRunner {
-
- private val windowFn = groupBy.window.windowFn
- private val groupedWindowInputs = new UnifiedMap[GROUP, TreeSortedMap[Window, FastList[IN]]]
- private val groupedFnRunners = new UnifiedMap[GROUP, FunctionRunner[IN, OUT]]
- private val groupedRunnerSetups = new UnifiedMap[GROUP, Boolean]
-
- override def process(message: Message): Unit = {
- val input = message.value.asInstanceOf[IN]
- val (group, windows) = groupBy.groupBy(message)
- if (!groupedWindowInputs.containsKey(group)) {
- groupedWindowInputs.put(group, new TreeSortedMap[Window, FastList[IN]]())
- }
- val windowInputs = groupedWindowInputs.get(group)
- windows.foreach { win =>
+class DefaultWindowRunner[IN, OUT](
+ windows: Windows,
+ fnRunner: FunctionRunner[IN, OUT])
+ extends WindowRunner[IN, OUT] {
+
+ private val windowFn = windows.windowFn
+ private val windowInputs = new TreeSortedMap[Window, FastList[TimestampedValue[IN]]]
+ private var setup = false
+
+ override def process(timestampedValue: TimestampedValue[IN]): Unit = {
+ val wins = windowFn(new Context[IN] {
+ override def element: IN = timestampedValue.value
+
+ override def timestamp: Instant = timestampedValue.timestamp
+ })
+ wins.foreach { win =>
if (windowFn.isNonMerging) {
if (!windowInputs.containsKey(win)) {
- val inputs = new FastList[IN](1)
+ val inputs = new FastList[TimestampedValue[IN]]
windowInputs.put(win, inputs)
}
- windowInputs.get(win).add(input)
+ windowInputs.get(win).add(timestampedValue)
} else {
- merge(windowInputs, win, input)
+ merge(windowInputs, win, timestampedValue)
}
}
- if (!groupedFnRunners.containsKey(group)) {
- val runner = userConfig.getValue[FunctionRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get
- groupedFnRunners.put(group, runner)
- groupedRunnerSetups.put(group, false)
- }
-
- def merge(windowInputs: TreeSortedMap[Window, FastList[IN]], win: Window, input: IN): Unit = {
- val intersected = windowInputs.keySet.select(new Predicate[Window] {
+ def merge(
+ winIns: TreeSortedMap[Window, FastList[TimestampedValue[IN]]],
+ win: Window, tv: TimestampedValue[IN]): Unit = {
+ val intersected = winIns.keySet.select(new Predicate[Window] {
override def accept(each: Window): Boolean = {
win.intersects(each)
}
})
var mergedWin = win
- val mergedInputs = FastList.newListWith(input)
+ val mergedInputs = FastList.newListWith(tv)
intersected.forEach(new Procedure[Window] {
override def value(each: Window): Unit = {
mergedWin = mergedWin.span(each)
- mergedInputs.addAll(windowInputs.remove(each))
+ mergedInputs.addAll(winIns.remove(each))
}
})
- windowInputs.put(mergedWin, mergedInputs)
+ winIns.put(mergedWin, mergedInputs)
}
-
}
- override def trigger(time: Instant): Unit = {
- groupedWindowInputs.forEachKeyValue(new Procedure2[GROUP, TreeSortedMap[Window, FastList[IN]]] {
- override def value(group: GROUP, windowInputs: TreeSortedMap[Window, FastList[IN]]): Unit = {
- onTrigger(group, windowInputs)
- }
- })
-
+ override def trigger(time: Instant): TraversableOnce[TimestampedValue[OUT]] = {
@annotation.tailrec
- def onTrigger(group: GROUP, windowInputs: TreeSortedMap[Window, FastList[IN]]): Unit = {
+ def onTrigger(
+ outputs: ArrayBuffer[TimestampedValue[OUT]]): TraversableOnce[TimestampedValue[OUT]] = {
if (windowInputs.notEmpty()) {
val firstWin = windowInputs.firstKey
if (!time.isBefore(firstWin.endTime)) {
val inputs = windowInputs.remove(firstWin)
- if (groupedFnRunners.containsKey(group)) {
- val runner = FunctionRunner.withEmitFn(groupedFnRunners.get(group),
- (output: OUT) => {
- taskContext.output(Message(output, time))
- })
- val setup = groupedRunnerSetups.get(group)
- if (!setup) {
- runner.setup()
- groupedRunnerSetups.put(group, true)
- }
- inputs.forEach(new Procedure[IN] {
- override def value(t: IN): Unit = {
- // .toList forces eager evaluation
- runner.process(t).toList
+ if (!setup) {
+ fnRunner.setup()
+ setup = true
+ }
+ inputs.forEach(new Procedure[TimestampedValue[IN]] {
+ override def value(tv: TimestampedValue[IN]): Unit = {
+ fnRunner.process(tv.value).foreach {
+ out: OUT => outputs += TimestampedValue(out, tv.timestamp)
}
- })
- // .toList forces eager evaluation
- runner.finish().toList
- if (groupBy.window.accumulationMode == Discarding) {
- runner.teardown()
- groupedRunnerSetups.put(group, false)
- // dicarding, setup need to be called for each window
- onTrigger(group, windowInputs)
- } else {
- // accumulating, setup is only called for the first window
- onTrigger(group, windowInputs)
}
+ })
+ fnRunner.finish().foreach {
+ out: OUT => outputs += TimestampedValue(out, firstWin.endTime.minusMillis(1))
+ }
+ if (windows.accumulationMode == Discarding) {
+ fnRunner.teardown()
+ setup = false
+ // discarding, setup need to be called for each window
+ onTrigger(outputs)
} else {
- throw new RuntimeException(s"FunctionRunner not found for group $group")
+ // accumulating, setup is only called for the first window
+ onTrigger(outputs)
}
+ } else {
+ outputs
}
+ } else {
+ outputs
}
}
+
+ onTrigger(ArrayBuffer.empty[TimestampedValue[OUT]])
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/GroupByPartitioner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/GroupByPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/GroupByPartitioner.scala
new file mode 100644
index 0000000..c2ddb0d
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/GroupByPartitioner.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.partitioner
+
+import org.apache.gearpump.Message
+
+/**
+ * Partition messages by applying group by function first.
+ *
+ * For example:
+ * {{{
+ * case class People(name: String, gender: String)
+ *
+ * object Test{
+ *
+ * val groupBy: (People => String) = people => people.gender
+ * val partitioner = GroupByPartitioner(groupBy)
+ * }
+ * }}}
+ *
+ * @param fn First apply message with groupBy function, then pick the hashCode of the output
+ * to do the partitioning. You must define hashCode() for output type of groupBy function.
+ */
+class GroupByPartitioner[T, GROUP](fn: T => GROUP) extends UnicastPartitioner {
+
+ override def getPartition(message: Message, partitionNum: Int, currentPartitionId: Int): Int = {
+ val hashCode = fn(message.value.asInstanceOf[T]).hashCode()
+ (hashCode & Integer.MAX_VALUE) % partitionNum
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
index ff1b2d4..74b0cc2 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
@@ -23,8 +23,7 @@ import java.time.Instant
import org.apache.gearpump._
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
-import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform
+import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner}
import org.apache.gearpump.streaming.task.{Task, TaskContext}
/**
@@ -40,17 +39,17 @@ import org.apache.gearpump.streaming.task.{Task, TaskContext}
* - `DataSource.close()` in `onStop`
*/
class DataSourceTask[IN, OUT] private[source](
- context: TaskContext,
- conf: UserConfig,
source: DataSource,
- transform: Transform[IN, OUT])
+ windowRunner: WindowRunner[IN, OUT],
+ context: TaskContext,
+ conf: UserConfig)
extends Task(context, conf) {
def this(context: TaskContext, conf: UserConfig) = {
- this(context, conf,
+ this(
conf.getValue[DataSource](GEARPUMP_STREAMING_SOURCE)(context.system).get,
- new Transform[IN, OUT](context,
- conf.getValue[FunctionRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system))
+ conf.getValue[WindowRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system).get,
+ context, conf
)
}
@@ -65,14 +64,19 @@ class DataSourceTask[IN, OUT] private[source](
override def onNext(m: Message): Unit = {
0.until(batchSize).foreach { _ =>
- Option(source.read()).foreach(transform.onNext)
+ Option(source.read()).foreach(
+ msg => windowRunner.process(
+ TimestampedValue(msg.value.asInstanceOf[IN], msg.timestamp)))
}
self ! Watermark(source.getWatermark)
}
override def onWatermarkProgress(watermark: Instant): Unit = {
- transform.onWatermarkProgress(watermark)
+ windowRunner.trigger(watermark).foreach {
+ result =>
+ context.output(Message(result.value, result.timestamp))
+ }
}
override def onStop(): Unit = {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
index 0ec2b6f..14abff8 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
@@ -30,7 +30,7 @@ case class Watermark(instant: Instant) {
object Watermark {
- val MAX: Instant = Instant.ofEpochMilli(MAX_TIME_MILLIS)
+ val MAX: Instant = Instant.ofEpochMilli(MAX_TIME_MILLIS + 1)
val MIN: Instant = Instant.ofEpochMilli(MIN_TIME_MILLIS)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
index 79bcc2a..44ec2c6 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
@@ -57,8 +57,8 @@ class Subscription(
private val pendingMessageCount: Array[Short] = new Array[Short](parallelism)
private val candidateMinClockSince: Array[Short] = new Array[Short](parallelism)
- private val minClockValue: Array[TimeStamp] = Array.fill(parallelism)(MAX_TIME_MILLIS)
- private val candidateMinClock: Array[TimeStamp] = Array.fill(parallelism)(MAX_TIME_MILLIS)
+ private val minClockValue: Array[TimeStamp] = Array.fill(parallelism)(Long.MaxValue)
+ private val candidateMinClock: Array[TimeStamp] = Array.fill(parallelism)(Long.MaxValue)
private var maxPendingCount: Short = 0
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
deleted file mode 100644
index 1934d14..0000000
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.dsl.partitioner
-
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-import org.apache.gearpump.Message
-import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitionerSpec.People
-
-class GroupByPartitionerSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
-
- it should "group by message payload and window" in {
- val mark = People("Mark", "male")
- val tom = People("Tom", "male")
- val michelle = People("Michelle", "female")
-
- val partitionNum = 10
-
- val groupBy = new GroupByPartitioner[People, String](_.gender)
- groupBy.getPartition(Message(mark, 1L), partitionNum) shouldBe
- groupBy.getPartition(Message(tom, 2L), partitionNum)
-
- groupBy.getPartition(Message(mark, 2L), partitionNum) should not be
- groupBy.getPartition(Message(michelle, 3L), partitionNum)
- }
-}
-
-object GroupByPartitionerSpec {
- case class People(name: String, gender: String)
-}