You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/09/06 07:19:07 UTC

incubator-gearpump git commit: [GEARPUMP-203] Use DataSourceTask and DataSinkTask for DSL

Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 5bf7c7cb6 -> 584a2ca23


[GEARPUMP-203] Use DataSourceTask and DataSinkTask for DSL

Author: manuzhang <ow...@gmail.com>

Closes #80 from manuzhang/window_dsl.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/584a2ca2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/584a2ca2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/584a2ca2

Branch: refs/heads/master
Commit: 584a2ca23180add9f7454334907a10c8144565de
Parents: 5bf7c7c
Author: manuzhang <ow...@gmail.com>
Authored: Tue Sep 6 15:19:00 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Tue Sep 6 15:19:00 2016 +0800

----------------------------------------------------------------------
 .../gearpump/streaming/javaapi/Processor.java   |  3 +-
 .../apache/gearpump/streaming/Constants.scala   |  3 +-
 .../streaming/dsl/plan/OpTranslator.scala       | 92 +++-----------------
 .../gearpump/streaming/dsl/plan/Planner.scala   |  4 +-
 .../streaming/source/DataSourceProcessor.scala  |  9 +-
 .../streaming/source/DataSourceTask.scala       | 46 +++++++---
 .../gearpump/streaming/dsl/StreamAppSpec.scala  |  4 +-
 .../gearpump/streaming/dsl/StreamSpec.scala     |  9 +-
 .../streaming/dsl/plan/OpTranslatorSpec.scala   | 25 +++---
 .../streaming/source/DataSourceTaskSpec.scala   |  6 +-
 10 files changed, 77 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/584a2ca2/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Processor.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Processor.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Processor.java
index 8757081..59b375f 100644
--- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Processor.java
+++ b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Processor.java
@@ -73,7 +73,8 @@ public class Processor<T extends org.apache.gearpump.streaming.task.Task> implem
    * @return the new created source processor
    */
   public static Processor<DataSourceTask> source(DataSource source, int parallelism, String description, UserConfig taskConf, ActorSystem system) {
-    org.apache.gearpump.streaming.Processor<DataSourceTask> p = DataSourceProcessor.apply(source, parallelism, description, taskConf, system);
+    org.apache.gearpump.streaming.Processor<DataSourceTask<Object, Object>> p =
+        DataSourceProcessor.apply(source, parallelism, description, taskConf, system);
     return new Processor(p);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/584a2ca2/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
index 320e46f..cd33b50 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
@@ -20,8 +20,7 @@ package org.apache.gearpump.streaming
 object Constants {
 
   val GEARPUMP_STREAMING_OPERATOR = "gearpump.streaming.dsl.operator"
-  val GEARPUMP_STREAMING_SOURCE = "gearpump.streaming.dsl.source"
-  val GEARPUMP_STREAMING_SINK = "gearpump.streaming.dsl.sink"
+  val GEARPUMP_STREAMING_SOURCE = "gearpump.streaming.source"
   val GEARPUMP_STREAMING_GROUPBY_FUNCTION = "gearpump.streaming.dsl.groupby-function"
 
   val GEARPUMP_STREAMING_LOCALITIES = "gearpump.streaming.localities"

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/584a2ca2/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
index 6bd0da2..8de291c 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
@@ -18,8 +18,6 @@
 
 package org.apache.gearpump.streaming.dsl.plan
 
-import java.time.Instant
-
 import scala.collection.TraversableOnce
 import akka.actor.ActorSystem
 import org.slf4j.Logger
@@ -30,8 +28,8 @@ import org.apache.gearpump.streaming.Processor
 import org.apache.gearpump.streaming.Processor.DefaultProcessor
 import org.apache.gearpump.streaming.dsl.op._
 import org.apache.gearpump.streaming.dsl.plan.OpTranslator._
-import org.apache.gearpump.streaming.sink.DataSink
-import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.sink.DataSinkProcessor
+import org.apache.gearpump.streaming.source.DataSourceTask
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
 import org.apache.gearpump.util.LogUtil
 
@@ -52,26 +50,24 @@ class OpTranslator extends java.io.Serializable {
         val userConfig = baseConfig.withValue(GEARPUMP_STREAMING_OPERATOR, func)
 
         op match {
-          case DataSourceOp(dataSource, parallism, conf, description) =>
-            Processor[SourceTask[Object, Object]](parallism,
+          case DataSourceOp(dataSource, parallelism, conf, description) =>
+            Processor[DataSourceTask[Any, Any]](parallelism,
               description = description + "." + func.description,
               userConfig.withValue(GEARPUMP_STREAMING_SOURCE, dataSource))
-          case groupby@GroupByOp(_, parallism, description, _) =>
-            Processor[GroupByTask[Object, Object, Object]](parallism,
+          case groupby@GroupByOp(_, parallelism, description, _) =>
+            Processor[GroupByTask[Object, Object, Object]](parallelism,
               description = description + "." + func.description,
               userConfig.withValue(GEARPUMP_STREAMING_GROUPBY_FUNCTION, groupby))
           case merge: MergeOp =>
             Processor[TransformTask[Object, Object]](1,
               description = op.description + "." + func.description,
               userConfig)
-          case ProcessorOp(processor, parallism, conf, description) =>
-            DefaultProcessor(parallism,
+          case ProcessorOp(processor, parallelism, conf, description) =>
+            DefaultProcessor(parallelism,
               description = description + "." + func.description,
               userConfig, processor)
           case DataSinkOp(dataSink, parallelism, conf, description) =>
-            Processor[SinkTask[Object]](parallelism,
-              description = description + func.description,
-              userConfig.withValue(GEARPUMP_STREAMING_SINK, dataSink))
+            DataSinkProcessor(dataSink, parallelism, description + func.description)
         }
       case op: SlaveOp[_] =>
         val func = toFunction(ops.ops)
@@ -156,7 +152,7 @@ object OpTranslator {
   class ReduceFunction[T](fun: (T, T) => T, descriptionMessage: String)
     extends SingleInputFunction[T, T] {
 
-    private var state: Any = null
+    private var state: Any = _
 
     override def process(value: T): TraversableOnce[T] = {
       if (state == null) {
@@ -200,50 +196,6 @@ object OpTranslator {
     }
   }
 
-  class SourceTask[T, OUT](
-      source: DataSource, operator: Option[SingleInputFunction[T, OUT]], taskContext: TaskContext,
-      userConf: UserConfig)
-    extends Task(taskContext, userConf) {
-
-    def this(taskContext: TaskContext, userConf: UserConfig) = {
-      this(
-        userConf.getValue[DataSource](GEARPUMP_STREAMING_SOURCE)(taskContext.system).get,
-        userConf.getValue[SingleInputFunction[T, OUT]](GEARPUMP_STREAMING_OPERATOR)(
-          taskContext.system),
-        taskContext, userConf)
-    }
-
-    override def onStart(startTime: Instant): Unit = {
-      source.open(taskContext, startTime)
-      self ! Message("start", System.currentTimeMillis())
-    }
-
-    override def onNext(msg: Message): Unit = {
-      val time = System.currentTimeMillis()
-      Option(source.read()).foreach { msg =>
-        operator match {
-          case Some(operator) =>
-            operator match {
-              case bad: DummyInputFunction[T] =>
-                taskContext.output(msg)
-              case _ =>
-                operator.process(msg.msg.asInstanceOf[T]).foreach(msg => {
-                  taskContext.output(new Message(msg.asInstanceOf[AnyRef], time))
-                })
-            }
-          case None =>
-            taskContext.output(msg)
-        }
-      }
-
-      self ! Message("next", System.currentTimeMillis())
-    }
-
-    override def onStop(): Unit = {
-      source.close()
-    }
-  }
-
   class TransformTask[IN, OUT](
       operator: Option[SingleInputFunction[IN, OUT]], taskContext: TaskContext,
       userConf: UserConfig) extends Task(taskContext, userConf) {
@@ -257,8 +209,8 @@ object OpTranslator {
       val time = msg.timestamp
 
       operator match {
-        case Some(operator) =>
-          operator.process(msg.msg.asInstanceOf[IN]).foreach { msg =>
+        case Some(op) =>
+          op.process(msg.msg.asInstanceOf[IN]).foreach { msg =>
             taskContext.output(new Message(msg.asInstanceOf[AnyRef], time))
           }
         case None =>
@@ -267,24 +219,4 @@ object OpTranslator {
     }
   }
 
-  class SinkTask[T](dataSink: DataSink, taskContext: TaskContext, userConf: UserConfig)
-    extends Task(taskContext, userConf) {
-
-    def this(taskContext: TaskContext, userConf: UserConfig) = {
-      this(userConf.getValue[DataSink](GEARPUMP_STREAMING_SINK)(taskContext.system).get,
-        taskContext, userConf)
-    }
-
-    override def onStart(startTime: Instant): Unit = {
-      dataSink.open(taskContext)
-    }
-
-    override def onNext(msg: Message): Unit = {
-      dataSink.write(msg)
-    }
-
-    override def onStop(): Unit = {
-      dataSink.close()
-    }
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/584a2ca2/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
index 3af5e97..f5bbd65 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
@@ -29,7 +29,7 @@ import org.apache.gearpump.util.Graph
 
 class Planner {
 
-  /*
+  /**
    * Converts Dag of Op to Dag of TaskDescription. TaskDescription is part of the low
    * level Graph API.
    */
@@ -74,7 +74,7 @@ class Planner {
       dag.inDegreeOf(node2) == 1 &&
       // For processor node, we don't allow it to merge with downstream operators
       !node1.head.isInstanceOf[ProcessorOp[_ <: Task]]) {
-      val (_, edge, _) = dag.outgoingEdgesOf(node1)(0)
+      val (_, edge, _) = dag.outgoingEdgesOf(node1).head
       if (edge == Direct) {
         val opList = OpChain(node1.ops ++ node2.ops)
         dag.addVertex(opList)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/584a2ca2/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala
index 4e3600f..d1cc5c8 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala
@@ -19,9 +19,8 @@
 package org.apache.gearpump.streaming.source
 
 import akka.actor.ActorSystem
-
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.Processor
+import org.apache.gearpump.streaming.{Constants, Processor}
 
 /**
  * Utility that helps user to create a DAG starting with [[DataSourceTask]]
@@ -42,8 +41,8 @@ object DataSourceProcessor {
       parallelism: Int = 1,
       description: String = "",
       taskConf: UserConfig = UserConfig.empty)(implicit system: ActorSystem)
-    : Processor[DataSourceTask] = {
-    Processor[DataSourceTask](parallelism, description = description,
-      taskConf.withValue[DataSource](DataSourceTask.DATA_SOURCE, dataSource))
+    : Processor[DataSourceTask[Any, Any]] = {
+    Processor[DataSourceTask[Any, Any]](parallelism, description,
+      taskConf.withValue[DataSource](Constants.GEARPUMP_STREAMING_SOURCE, dataSource))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/584a2ca2/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
index 5d1a11e..fb2d898 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
@@ -19,17 +19,12 @@
 package org.apache.gearpump.streaming.source
 
 import java.time.Instant
-import java.util.concurrent.TimeUnit
 
 import org.apache.gearpump._
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{UpstreamMinClock, Task, TaskContext}
-
-import scala.concurrent.duration._
-
-object DataSourceTask {
-  val DATA_SOURCE = "data_source"
-}
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.dsl.plan.OpTranslator.{DummyInputFunction, SingleInputFunction}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
 /**
  * Default Task container for [[org.apache.gearpump.streaming.source.DataSource]] that
@@ -43,14 +38,39 @@ object DataSourceTask {
  *  - `DataSource.read()` in each `onNext`, which reads a batch of messages
  *  - `DataSource.close()` in `onStop`
  */
-class DataSourceTask private[source](context: TaskContext, conf: UserConfig, source: DataSource)
+class DataSourceTask[IN, OUT] private[source](
+    context: TaskContext,
+    conf: UserConfig,
+    source: DataSource,
+    operator: Option[SingleInputFunction[IN, OUT]])
   extends Task(context, conf) {
 
   def this(context: TaskContext, conf: UserConfig) = {
-    this(context, conf, conf.getValue[DataSource](DataSourceTask.DATA_SOURCE)(context.system).get)
+    this(context, conf,
+      conf.getValue[DataSource](GEARPUMP_STREAMING_SOURCE)(context.system).get,
+      conf.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system)
+    )
   }
+
   private val batchSize = conf.getInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE).getOrElse(1000)
 
+  private val processMessage: Message => Unit =
+    operator match {
+      case Some(op) =>
+        op match {
+          case bad: DummyInputFunction[IN] =>
+            (message: Message) => context.output(message)
+          case _ =>
+            (message: Message) => {
+              op.process(message.msg.asInstanceOf[IN]).foreach { m: OUT =>
+                context.output(Message(m, message.timestamp))
+              }
+            }
+        }
+      case None =>
+        (message: Message) => context.output(message)
+    }
+
   override def onStart(startTime: Instant): Unit = {
     LOG.info(s"opening data source at $startTime")
     source.open(context, startTime)
@@ -58,11 +78,9 @@ class DataSourceTask private[source](context: TaskContext, conf: UserConfig, sou
     self ! Watermark(source.getWatermark)
   }
 
-  override def onNext(message: Message): Unit = {
+  override def onNext(m: Message): Unit = {
     0.until(batchSize).foreach { _ =>
-      Option(source.read()).foreach { msg =>
-        context.output(msg)
-      }
+      Option(source.read()).foreach(processMessage)
     }
 
     self ! Watermark(source.getWatermark)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/584a2ca2/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
index dd286de..e919a34 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
@@ -21,7 +21,7 @@ package org.apache.gearpump.streaming.dsl
 import akka.actor.ActorSystem
 import org.apache.gearpump.cluster.TestUtil
 import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.streaming.dsl.plan.OpTranslator.SourceTask
+import org.apache.gearpump.streaming.source.DataSourceTask
 import org.mockito.Mockito.when
 import org.scalatest._
 import org.scalatest.mock.MockitoSugar
@@ -60,7 +60,7 @@ class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with M
     val parallism = 3
     app.source(List("A", "B", "C"), parallism, "").flatMap(Array(_)).reduce(_ + _)
     val task = app.plan.dag.vertices.iterator.next()
-    assert(task.taskClass == classOf[SourceTask[_, _]].getName)
+    assert(task.taskClass == classOf[DataSourceTask[_, _]].getName)
     assert(task.parallelism == parallism)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/584a2ca2/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
index 82979e0..816feef 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
@@ -26,6 +26,7 @@ import org.apache.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner}
 import org.apache.gearpump.streaming.dsl.StreamSpec.Join
 import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
 import org.apache.gearpump.streaming.dsl.plan.OpTranslator._
+import org.apache.gearpump.streaming.source.DataSourceTask
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
 import org.apache.gearpump.util.Graph
 import org.apache.gearpump.util.Graph._
@@ -40,7 +41,7 @@ import scala.util.{Either, Left, Right}
 class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar {
 
 
-  implicit var system: ActorSystem = null
+  implicit var system: ActorSystem = _
 
   override def beforeAll(): Unit = {
     system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
@@ -75,7 +76,7 @@ class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Mock
     val query = app.source(List("two"), 1, "").map[Either[(String, Int), String]](Right(_))
     stream.merge(query).process[(String, Int)](classOf[Join], 1)
 
-    val appDescription = app.plan
+    val appDescription = app.plan()
 
     val dagTopology = appDescription.dag.mapVertex(_.taskClass).mapEdge { (node1, edge, node2) =>
       edge.partitionerFactory.partitioner.getClass.getName
@@ -87,7 +88,7 @@ class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Mock
   }
 
   private def getExpectedDagTopology: Graph[String, String] = {
-    val source = classOf[SourceTask[_, _]].getName
+    val source = classOf[DataSourceTask[_, _]].getName
     val group = classOf[GroupByTask[_, _, _]].getName
     val merge = classOf[TransformTask[_, _]].getName
     val join = classOf[Join].getName
@@ -108,7 +109,7 @@ object StreamSpec {
 
   class Join(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
 
-    var query: String = null
+    var query: String = _
 
     override def onNext(msg: Message): Unit = {
       msg.msg match {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/584a2ca2/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
index 144df0f..2112fd0 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
@@ -22,19 +22,18 @@ import java.time.Instant
 
 import scala.concurrent.Await
 import scala.concurrent.duration.Duration
-
 import akka.actor.ActorSystem
 import org.mockito.ArgumentCaptor
 import org.mockito.Matchers._
 import org.mockito.Mockito._
 import org.scalatest._
-
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.{TestUtil, UserConfig}
 import org.apache.gearpump.streaming.Constants._
 import org.apache.gearpump.streaming.MockUtil
 import org.apache.gearpump.streaming.dsl.CollectionDataSource
 import org.apache.gearpump.streaming.dsl.plan.OpTranslator._
+import org.apache.gearpump.streaming.source.DataSourceTask
 
 class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
 
@@ -69,25 +68,31 @@ class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
   "Source" should "iterate over input source and apply attached operator" in {
 
     val taskContext = MockUtil.mockTaskContext
+    implicit val actorSystem = MockUtil.system
 
-    val conf = UserConfig.empty
     val data = "one two three".split("\\s")
+    val dataSource = new CollectionDataSource[String](data)
+    val conf = UserConfig.empty.withValue(GEARPUMP_STREAMING_SOURCE, dataSource)
 
     // Source with no transformer
-    val source = new SourceTask[String, String](new CollectionDataSource[String](data), None,
+    val source = new DataSourceTask[String, String](
       taskContext, conf)
     source.onStart(Instant.EPOCH)
     source.onNext(Message("next"))
-    verify(taskContext, times(1)).output(anyObject())
+    data.foreach { s =>
+      verify(taskContext, times(1)).output(Message(s))
+    }
 
     // Source with transformer
     val anotherTaskContext = MockUtil.mockTaskContext
     val double = new FlatMapFunction[String, String](word => List(word, word), "double")
-    val another = new SourceTask(new CollectionDataSource[String](data), Some(double),
-      anotherTaskContext, conf)
+    val another = new DataSourceTask(anotherTaskContext,
+      conf.withValue(GEARPUMP_STREAMING_OPERATOR, double))
     another.onStart(Instant.EPOCH)
     another.onNext(Message("next"))
-    verify(anotherTaskContext, times(2)).output(anyObject())
+    data.foreach { s =>
+      verify(anotherTaskContext, times(2)).output(Message(s))
+    }
   }
 
   "GroupByTask" should "group input by groupBy Function and " +
@@ -95,8 +100,6 @@ class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
 
     val data = "1 2  2  3 3  3"
 
-    var map = Map.empty[String, Int]
-
     val concat = new ReduceFunction[String]({ (left, right) =>
       left + right
     }, "concat")
@@ -119,7 +122,7 @@ class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
 
     import scala.collection.JavaConverters._
 
-    val values = peopleCaptor.getAllValues().asScala.map(input => input.msg.asInstanceOf[String])
+    val values = peopleCaptor.getAllValues.asScala.map(input => input.msg.asInstanceOf[String])
     assert(values.mkString(",") == "1,2,22,3,33,333")
     system.terminate()
     Await.result(system.whenTerminated, Duration.Inf)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/584a2ca2/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
index ae9bf37..c786047 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
@@ -39,7 +39,7 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with
       val config = UserConfig.empty
         .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
 
-      val sourceTask = new DataSourceTask(taskContext, config, dataSource)
+      val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, None)
 
       sourceTask.onStart(startTime)
       verify(dataSource).open(taskContext, startTime)
@@ -54,7 +54,7 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with
       val config = UserConfig.empty
         .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
 
-      val sourceTask = new DataSourceTask(taskContext, config, dataSource)
+      val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, None)
       val msg = Message(str)
       when(dataSource.read()).thenReturn(msg)
 
@@ -69,7 +69,7 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with
     val dataSource = mock[DataSource]
     val config = UserConfig.empty
       .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
-    val sourceTask = new DataSourceTask(taskContext, config, dataSource)
+    val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, None)
 
     sourceTask.onStop()
     verify(dataSource).close()