You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/09/06 07:19:07 UTC
incubator-gearpump git commit: [GEARPUMP-203] Use DataSourceTask and
DataSinkTask for DSL
Repository: incubator-gearpump
Updated Branches:
refs/heads/master 5bf7c7cb6 -> 584a2ca23
[GEARPUMP-203] Use DataSourceTask and DataSinkTask for DSL
Author: manuzhang <ow...@gmail.com>
Closes #80 from manuzhang/window_dsl.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/584a2ca2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/584a2ca2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/584a2ca2
Branch: refs/heads/master
Commit: 584a2ca23180add9f7454334907a10c8144565de
Parents: 5bf7c7c
Author: manuzhang <ow...@gmail.com>
Authored: Tue Sep 6 15:19:00 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Tue Sep 6 15:19:00 2016 +0800
----------------------------------------------------------------------
.../gearpump/streaming/javaapi/Processor.java | 3 +-
.../apache/gearpump/streaming/Constants.scala | 3 +-
.../streaming/dsl/plan/OpTranslator.scala | 92 +++-----------------
.../gearpump/streaming/dsl/plan/Planner.scala | 4 +-
.../streaming/source/DataSourceProcessor.scala | 9 +-
.../streaming/source/DataSourceTask.scala | 46 +++++++---
.../gearpump/streaming/dsl/StreamAppSpec.scala | 4 +-
.../gearpump/streaming/dsl/StreamSpec.scala | 9 +-
.../streaming/dsl/plan/OpTranslatorSpec.scala | 25 +++---
.../streaming/source/DataSourceTaskSpec.scala | 6 +-
10 files changed, 77 insertions(+), 124 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/584a2ca2/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Processor.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Processor.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Processor.java
index 8757081..59b375f 100644
--- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Processor.java
+++ b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Processor.java
@@ -73,7 +73,8 @@ public class Processor<T extends org.apache.gearpump.streaming.task.Task> implem
* @return the new created source processor
*/
public static Processor<DataSourceTask> source(DataSource source, int parallelism, String description, UserConfig taskConf, ActorSystem system) {
- org.apache.gearpump.streaming.Processor<DataSourceTask> p = DataSourceProcessor.apply(source, parallelism, description, taskConf, system);
+ org.apache.gearpump.streaming.Processor<DataSourceTask<Object, Object>> p =
+ DataSourceProcessor.apply(source, parallelism, description, taskConf, system);
return new Processor(p);
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/584a2ca2/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
index 320e46f..cd33b50 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
@@ -20,8 +20,7 @@ package org.apache.gearpump.streaming
object Constants {
val GEARPUMP_STREAMING_OPERATOR = "gearpump.streaming.dsl.operator"
- val GEARPUMP_STREAMING_SOURCE = "gearpump.streaming.dsl.source"
- val GEARPUMP_STREAMING_SINK = "gearpump.streaming.dsl.sink"
+ val GEARPUMP_STREAMING_SOURCE = "gearpump.streaming.source"
val GEARPUMP_STREAMING_GROUPBY_FUNCTION = "gearpump.streaming.dsl.groupby-function"
val GEARPUMP_STREAMING_LOCALITIES = "gearpump.streaming.localities"
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/584a2ca2/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
index 6bd0da2..8de291c 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
@@ -18,8 +18,6 @@
package org.apache.gearpump.streaming.dsl.plan
-import java.time.Instant
-
import scala.collection.TraversableOnce
import akka.actor.ActorSystem
import org.slf4j.Logger
@@ -30,8 +28,8 @@ import org.apache.gearpump.streaming.Processor
import org.apache.gearpump.streaming.Processor.DefaultProcessor
import org.apache.gearpump.streaming.dsl.op._
import org.apache.gearpump.streaming.dsl.plan.OpTranslator._
-import org.apache.gearpump.streaming.sink.DataSink
-import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.sink.DataSinkProcessor
+import org.apache.gearpump.streaming.source.DataSourceTask
import org.apache.gearpump.streaming.task.{Task, TaskContext}
import org.apache.gearpump.util.LogUtil
@@ -52,26 +50,24 @@ class OpTranslator extends java.io.Serializable {
val userConfig = baseConfig.withValue(GEARPUMP_STREAMING_OPERATOR, func)
op match {
- case DataSourceOp(dataSource, parallism, conf, description) =>
- Processor[SourceTask[Object, Object]](parallism,
+ case DataSourceOp(dataSource, parallelism, conf, description) =>
+ Processor[DataSourceTask[Any, Any]](parallelism,
description = description + "." + func.description,
userConfig.withValue(GEARPUMP_STREAMING_SOURCE, dataSource))
- case groupby@GroupByOp(_, parallism, description, _) =>
- Processor[GroupByTask[Object, Object, Object]](parallism,
+ case groupby@GroupByOp(_, parallelism, description, _) =>
+ Processor[GroupByTask[Object, Object, Object]](parallelism,
description = description + "." + func.description,
userConfig.withValue(GEARPUMP_STREAMING_GROUPBY_FUNCTION, groupby))
case merge: MergeOp =>
Processor[TransformTask[Object, Object]](1,
description = op.description + "." + func.description,
userConfig)
- case ProcessorOp(processor, parallism, conf, description) =>
- DefaultProcessor(parallism,
+ case ProcessorOp(processor, parallelism, conf, description) =>
+ DefaultProcessor(parallelism,
description = description + "." + func.description,
userConfig, processor)
case DataSinkOp(dataSink, parallelism, conf, description) =>
- Processor[SinkTask[Object]](parallelism,
- description = description + func.description,
- userConfig.withValue(GEARPUMP_STREAMING_SINK, dataSink))
+ DataSinkProcessor(dataSink, parallelism, description + func.description)
}
case op: SlaveOp[_] =>
val func = toFunction(ops.ops)
@@ -156,7 +152,7 @@ object OpTranslator {
class ReduceFunction[T](fun: (T, T) => T, descriptionMessage: String)
extends SingleInputFunction[T, T] {
- private var state: Any = null
+ private var state: Any = _
override def process(value: T): TraversableOnce[T] = {
if (state == null) {
@@ -200,50 +196,6 @@ object OpTranslator {
}
}
- class SourceTask[T, OUT](
- source: DataSource, operator: Option[SingleInputFunction[T, OUT]], taskContext: TaskContext,
- userConf: UserConfig)
- extends Task(taskContext, userConf) {
-
- def this(taskContext: TaskContext, userConf: UserConfig) = {
- this(
- userConf.getValue[DataSource](GEARPUMP_STREAMING_SOURCE)(taskContext.system).get,
- userConf.getValue[SingleInputFunction[T, OUT]](GEARPUMP_STREAMING_OPERATOR)(
- taskContext.system),
- taskContext, userConf)
- }
-
- override def onStart(startTime: Instant): Unit = {
- source.open(taskContext, startTime)
- self ! Message("start", System.currentTimeMillis())
- }
-
- override def onNext(msg: Message): Unit = {
- val time = System.currentTimeMillis()
- Option(source.read()).foreach { msg =>
- operator match {
- case Some(operator) =>
- operator match {
- case bad: DummyInputFunction[T] =>
- taskContext.output(msg)
- case _ =>
- operator.process(msg.msg.asInstanceOf[T]).foreach(msg => {
- taskContext.output(new Message(msg.asInstanceOf[AnyRef], time))
- })
- }
- case None =>
- taskContext.output(msg)
- }
- }
-
- self ! Message("next", System.currentTimeMillis())
- }
-
- override def onStop(): Unit = {
- source.close()
- }
- }
-
class TransformTask[IN, OUT](
operator: Option[SingleInputFunction[IN, OUT]], taskContext: TaskContext,
userConf: UserConfig) extends Task(taskContext, userConf) {
@@ -257,8 +209,8 @@ object OpTranslator {
val time = msg.timestamp
operator match {
- case Some(operator) =>
- operator.process(msg.msg.asInstanceOf[IN]).foreach { msg =>
+ case Some(op) =>
+ op.process(msg.msg.asInstanceOf[IN]).foreach { msg =>
taskContext.output(new Message(msg.asInstanceOf[AnyRef], time))
}
case None =>
@@ -267,24 +219,4 @@ object OpTranslator {
}
}
- class SinkTask[T](dataSink: DataSink, taskContext: TaskContext, userConf: UserConfig)
- extends Task(taskContext, userConf) {
-
- def this(taskContext: TaskContext, userConf: UserConfig) = {
- this(userConf.getValue[DataSink](GEARPUMP_STREAMING_SINK)(taskContext.system).get,
- taskContext, userConf)
- }
-
- override def onStart(startTime: Instant): Unit = {
- dataSink.open(taskContext)
- }
-
- override def onNext(msg: Message): Unit = {
- dataSink.write(msg)
- }
-
- override def onStop(): Unit = {
- dataSink.close()
- }
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/584a2ca2/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
index 3af5e97..f5bbd65 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
@@ -29,7 +29,7 @@ import org.apache.gearpump.util.Graph
class Planner {
- /*
+ /**
* Converts Dag of Op to Dag of TaskDescription. TaskDescription is part of the low
* level Graph API.
*/
@@ -74,7 +74,7 @@ class Planner {
dag.inDegreeOf(node2) == 1 &&
// For processor node, we don't allow it to merge with downstream operators
!node1.head.isInstanceOf[ProcessorOp[_ <: Task]]) {
- val (_, edge, _) = dag.outgoingEdgesOf(node1)(0)
+ val (_, edge, _) = dag.outgoingEdgesOf(node1).head
if (edge == Direct) {
val opList = OpChain(node1.ops ++ node2.ops)
dag.addVertex(opList)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/584a2ca2/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala
index 4e3600f..d1cc5c8 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala
@@ -19,9 +19,8 @@
package org.apache.gearpump.streaming.source
import akka.actor.ActorSystem
-
import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.Processor
+import org.apache.gearpump.streaming.{Constants, Processor}
/**
* Utility that helps user to create a DAG starting with [[DataSourceTask]]
@@ -42,8 +41,8 @@ object DataSourceProcessor {
parallelism: Int = 1,
description: String = "",
taskConf: UserConfig = UserConfig.empty)(implicit system: ActorSystem)
- : Processor[DataSourceTask] = {
- Processor[DataSourceTask](parallelism, description = description,
- taskConf.withValue[DataSource](DataSourceTask.DATA_SOURCE, dataSource))
+ : Processor[DataSourceTask[Any, Any]] = {
+ Processor[DataSourceTask[Any, Any]](parallelism, description,
+ taskConf.withValue[DataSource](Constants.GEARPUMP_STREAMING_SOURCE, dataSource))
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/584a2ca2/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
index 5d1a11e..fb2d898 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
@@ -19,17 +19,12 @@
package org.apache.gearpump.streaming.source
import java.time.Instant
-import java.util.concurrent.TimeUnit
import org.apache.gearpump._
import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{UpstreamMinClock, Task, TaskContext}
-
-import scala.concurrent.duration._
-
-object DataSourceTask {
- val DATA_SOURCE = "data_source"
-}
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.dsl.plan.OpTranslator.{DummyInputFunction, SingleInputFunction}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
/**
* Default Task container for [[org.apache.gearpump.streaming.source.DataSource]] that
@@ -43,14 +38,39 @@ object DataSourceTask {
* - `DataSource.read()` in each `onNext`, which reads a batch of messages
* - `DataSource.close()` in `onStop`
*/
-class DataSourceTask private[source](context: TaskContext, conf: UserConfig, source: DataSource)
+class DataSourceTask[IN, OUT] private[source](
+ context: TaskContext,
+ conf: UserConfig,
+ source: DataSource,
+ operator: Option[SingleInputFunction[IN, OUT]])
extends Task(context, conf) {
def this(context: TaskContext, conf: UserConfig) = {
- this(context, conf, conf.getValue[DataSource](DataSourceTask.DATA_SOURCE)(context.system).get)
+ this(context, conf,
+ conf.getValue[DataSource](GEARPUMP_STREAMING_SOURCE)(context.system).get,
+ conf.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system)
+ )
}
+
private val batchSize = conf.getInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE).getOrElse(1000)
+ private val processMessage: Message => Unit =
+ operator match {
+ case Some(op) =>
+ op match {
+ case bad: DummyInputFunction[IN] =>
+ (message: Message) => context.output(message)
+ case _ =>
+ (message: Message) => {
+ op.process(message.msg.asInstanceOf[IN]).foreach { m: OUT =>
+ context.output(Message(m, message.timestamp))
+ }
+ }
+ }
+ case None =>
+ (message: Message) => context.output(message)
+ }
+
override def onStart(startTime: Instant): Unit = {
LOG.info(s"opening data source at $startTime")
source.open(context, startTime)
@@ -58,11 +78,9 @@ class DataSourceTask private[source](context: TaskContext, conf: UserConfig, sou
self ! Watermark(source.getWatermark)
}
- override def onNext(message: Message): Unit = {
+ override def onNext(m: Message): Unit = {
0.until(batchSize).foreach { _ =>
- Option(source.read()).foreach { msg =>
- context.output(msg)
- }
+ Option(source.read()).foreach(processMessage)
}
self ! Watermark(source.getWatermark)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/584a2ca2/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
index dd286de..e919a34 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
@@ -21,7 +21,7 @@ package org.apache.gearpump.streaming.dsl
import akka.actor.ActorSystem
import org.apache.gearpump.cluster.TestUtil
import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.streaming.dsl.plan.OpTranslator.SourceTask
+import org.apache.gearpump.streaming.source.DataSourceTask
import org.mockito.Mockito.when
import org.scalatest._
import org.scalatest.mock.MockitoSugar
@@ -60,7 +60,7 @@ class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with M
val parallism = 3
app.source(List("A", "B", "C"), parallism, "").flatMap(Array(_)).reduce(_ + _)
val task = app.plan.dag.vertices.iterator.next()
- assert(task.taskClass == classOf[SourceTask[_, _]].getName)
+ assert(task.taskClass == classOf[DataSourceTask[_, _]].getName)
assert(task.parallelism == parallism)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/584a2ca2/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
index 82979e0..816feef 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
@@ -26,6 +26,7 @@ import org.apache.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner}
import org.apache.gearpump.streaming.dsl.StreamSpec.Join
import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
import org.apache.gearpump.streaming.dsl.plan.OpTranslator._
+import org.apache.gearpump.streaming.source.DataSourceTask
import org.apache.gearpump.streaming.task.{Task, TaskContext}
import org.apache.gearpump.util.Graph
import org.apache.gearpump.util.Graph._
@@ -40,7 +41,7 @@ import scala.util.{Either, Left, Right}
class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar {
- implicit var system: ActorSystem = null
+ implicit var system: ActorSystem = _
override def beforeAll(): Unit = {
system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
@@ -75,7 +76,7 @@ class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Mock
val query = app.source(List("two"), 1, "").map[Either[(String, Int), String]](Right(_))
stream.merge(query).process[(String, Int)](classOf[Join], 1)
- val appDescription = app.plan
+ val appDescription = app.plan()
val dagTopology = appDescription.dag.mapVertex(_.taskClass).mapEdge { (node1, edge, node2) =>
edge.partitionerFactory.partitioner.getClass.getName
@@ -87,7 +88,7 @@ class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Mock
}
private def getExpectedDagTopology: Graph[String, String] = {
- val source = classOf[SourceTask[_, _]].getName
+ val source = classOf[DataSourceTask[_, _]].getName
val group = classOf[GroupByTask[_, _, _]].getName
val merge = classOf[TransformTask[_, _]].getName
val join = classOf[Join].getName
@@ -108,7 +109,7 @@ object StreamSpec {
class Join(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
- var query: String = null
+ var query: String = _
override def onNext(msg: Message): Unit = {
msg.msg match {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/584a2ca2/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
index 144df0f..2112fd0 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
@@ -22,19 +22,18 @@ import java.time.Instant
import scala.concurrent.Await
import scala.concurrent.duration.Duration
-
import akka.actor.ActorSystem
import org.mockito.ArgumentCaptor
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.scalatest._
-
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.{TestUtil, UserConfig}
import org.apache.gearpump.streaming.Constants._
import org.apache.gearpump.streaming.MockUtil
import org.apache.gearpump.streaming.dsl.CollectionDataSource
import org.apache.gearpump.streaming.dsl.plan.OpTranslator._
+import org.apache.gearpump.streaming.source.DataSourceTask
class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
@@ -69,25 +68,31 @@ class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
"Source" should "iterate over input source and apply attached operator" in {
val taskContext = MockUtil.mockTaskContext
+ implicit val actorSystem = MockUtil.system
- val conf = UserConfig.empty
val data = "one two three".split("\\s")
+ val dataSource = new CollectionDataSource[String](data)
+ val conf = UserConfig.empty.withValue(GEARPUMP_STREAMING_SOURCE, dataSource)
// Source with no transformer
- val source = new SourceTask[String, String](new CollectionDataSource[String](data), None,
+ val source = new DataSourceTask[String, String](
taskContext, conf)
source.onStart(Instant.EPOCH)
source.onNext(Message("next"))
- verify(taskContext, times(1)).output(anyObject())
+ data.foreach { s =>
+ verify(taskContext, times(1)).output(Message(s))
+ }
// Source with transformer
val anotherTaskContext = MockUtil.mockTaskContext
val double = new FlatMapFunction[String, String](word => List(word, word), "double")
- val another = new SourceTask(new CollectionDataSource[String](data), Some(double),
- anotherTaskContext, conf)
+ val another = new DataSourceTask(anotherTaskContext,
+ conf.withValue(GEARPUMP_STREAMING_OPERATOR, double))
another.onStart(Instant.EPOCH)
another.onNext(Message("next"))
- verify(anotherTaskContext, times(2)).output(anyObject())
+ data.foreach { s =>
+ verify(anotherTaskContext, times(2)).output(Message(s))
+ }
}
"GroupByTask" should "group input by groupBy Function and " +
@@ -95,8 +100,6 @@ class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
val data = "1 2 2 3 3 3"
- var map = Map.empty[String, Int]
-
val concat = new ReduceFunction[String]({ (left, right) =>
left + right
}, "concat")
@@ -119,7 +122,7 @@ class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
import scala.collection.JavaConverters._
- val values = peopleCaptor.getAllValues().asScala.map(input => input.msg.asInstanceOf[String])
+ val values = peopleCaptor.getAllValues.asScala.map(input => input.msg.asInstanceOf[String])
assert(values.mkString(",") == "1,2,22,3,33,333")
system.terminate()
Await.result(system.whenTerminated, Duration.Inf)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/584a2ca2/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
index ae9bf37..c786047 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
@@ -39,7 +39,7 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with
val config = UserConfig.empty
.withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
- val sourceTask = new DataSourceTask(taskContext, config, dataSource)
+ val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, None)
sourceTask.onStart(startTime)
verify(dataSource).open(taskContext, startTime)
@@ -54,7 +54,7 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with
val config = UserConfig.empty
.withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
- val sourceTask = new DataSourceTask(taskContext, config, dataSource)
+ val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, None)
val msg = Message(str)
when(dataSource.read()).thenReturn(msg)
@@ -69,7 +69,7 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with
val dataSource = mock[DataSource]
val config = UserConfig.empty
.withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
- val sourceTask = new DataSourceTask(taskContext, config, dataSource)
+ val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, None)
sourceTask.onStop()
verify(dataSource).close()