You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/10/11 03:58:01 UTC

[1/2] incubator-gearpump git commit: Merge branch 'master' into akka-streams

Repository: incubator-gearpump
Updated Branches:
  refs/heads/akka-streams 4fe5458f4 -> bc3940352


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala
new file mode 100644
index 0000000..4b7649f
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.task
+
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, GroupAlsoByWindow, WindowRunner}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+
+/**
+ * This task triggers output on watermark progress.
+ */
+class EventTimeTriggerTask[IN, GROUP](
+    groupBy: GroupAlsoByWindow[IN, GROUP],
+    windowRunner: WindowRunner,
+    taskContext: TaskContext,
+    userConfig: UserConfig)
+  extends Task(taskContext, userConfig) {
+
+  def this(groupBy: GroupAlsoByWindow[IN, GROUP],
+      taskContext: TaskContext, userConfig: UserConfig) = {
+    this(groupBy, new DefaultWindowRunner(taskContext, userConfig, groupBy)(taskContext.system),
+      taskContext, userConfig)
+  }
+
+  def this(taskContext: TaskContext, userConfig: UserConfig) = {
+    this(userConfig.getValue[GroupAlsoByWindow[IN, GROUP]](
+      GEARPUMP_STREAMING_GROUPBY_FUNCTION)(taskContext.system).get,
+      taskContext, userConfig)
+  }
+
+  override def onNext(message: Message): Unit = {
+    windowRunner.process(message)
+  }
+
+  override def onWatermarkProgress(watermark: Instant): Unit = {
+    windowRunner.trigger(watermark)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
new file mode 100644
index 0000000..980a54b
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.task
+
+import java.time.Instant
+import java.util.concurrent.TimeUnit
+
+import akka.actor.Actor.Receive
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.dsl.task.ProcessingTimeTriggerTask.Triggering
+import org.apache.gearpump.streaming.dsl.window.api.SlidingWindowFn
+import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, GroupAlsoByWindow, WindowRunner}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+
+import scala.concurrent.duration.FiniteDuration
+
+object ProcessingTimeTriggerTask {
+  case object Triggering
+}
+
+/**
+ * This task triggers output on scheduled system time interval.
+ */
+class ProcessingTimeTriggerTask[IN, GROUP](
+    groupBy: GroupAlsoByWindow[IN, GROUP],
+    windowRunner: WindowRunner,
+    taskContext: TaskContext,
+    userConfig: UserConfig)
+  extends Task(taskContext, userConfig) {
+
+  def this(groupBy: GroupAlsoByWindow[IN, GROUP],
+      taskContext: TaskContext, userConfig: UserConfig) = {
+    this(groupBy, new DefaultWindowRunner(taskContext, userConfig, groupBy)(taskContext.system),
+      taskContext, userConfig)
+  }
+
+  def this(taskContext: TaskContext, userConfig: UserConfig) = {
+    this(userConfig.getValue[GroupAlsoByWindow[IN, GROUP]](
+      GEARPUMP_STREAMING_GROUPBY_FUNCTION)(taskContext.system).get,
+      taskContext, userConfig)
+  }
+
+  private val windowFn = groupBy.window.windowFn.asInstanceOf[SlidingWindowFn]
+  private val windowSizeMs = windowFn.size.toMillis
+  private val windowStepMs = windowFn.step.toMillis
+
+  override def onStart(startTime: Instant): Unit = {
+    val initialDelay = windowSizeMs - Instant.now.toEpochMilli % windowSizeMs
+    taskContext.scheduleOnce(
+      new FiniteDuration(initialDelay, TimeUnit.MILLISECONDS))(self ! Triggering)
+  }
+
+  override def onNext(message: Message): Unit = {
+    windowRunner.process(message)
+  }
+
+  override def receiveUnManagedMessage: Receive = {
+    case Triggering =>
+      windowRunner.trigger(Instant.now)
+      taskContext.scheduleOnce(
+        new FiniteDuration(windowStepMs, TimeUnit.MILLISECONDS))(self ! Triggering)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
new file mode 100644
index 0000000..e35f085
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.task
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+
+class TransformTask[IN, OUT](
+    operator: Option[SingleInputFunction[IN, OUT]], taskContext: TaskContext,
+    userConf: UserConfig) extends Task(taskContext, userConf) {
+
+  def this(taskContext: TaskContext, userConf: UserConfig) = {
+    this(userConf.getValue[SingleInputFunction[IN, OUT]](
+      GEARPUMP_STREAMING_OPERATOR)(taskContext.system), taskContext, userConf)
+  }
+
+  override def onNext(msg: Message): Unit = {
+    val time = msg.timestamp
+
+    operator match {
+      case Some(op) =>
+        op.process(msg.msg.asInstanceOf[IN]).foreach { msg =>
+          taskContext.output(new Message(msg, time))
+        }
+      case None =>
+        taskContext.output(new Message(msg.msg, time))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala
new file mode 100644
index 0000000..a4524a8
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.window.api
+
+sealed trait AccumulationMode
+
+case object Accumulating extends AccumulationMode
+
+case object Discarding extends AccumulationMode

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/GroupByFn.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/GroupByFn.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/GroupByFn.scala
new file mode 100644
index 0000000..30e68ba
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/GroupByFn.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.window.api
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Processor
+import org.apache.gearpump.streaming.task.Task
+
+/**
+ * Divides messages into groups according its payload and timestamp.
+ * Check [[org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow]]
+ * for default implementation.
+ */
+trait GroupByFn[T, GROUP] {
+
+  /**
+   * Used by
+   *   1. GroupByPartitioner to shuffle messages
+   *   2. WindowRunner to group messages for time-based aggregation
+   */
+  def groupBy(message: Message): GROUP
+
+  /**
+   * Returns a Processor according to window trigger during planning
+   */
+  def getProcessor(parallelism: Int, description: String,
+      userConfig: UserConfig)(implicit system: ActorSystem): Processor[_ <: Task]
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala
new file mode 100644
index 0000000..9865e18
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.window.api
+
+sealed trait Trigger
+
+case object EventTimeTrigger extends Trigger
+
+case object ProcessingTimeTrigger extends Trigger
+
+case object CountTrigger extends Trigger
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Window.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Window.scala
new file mode 100644
index 0000000..4b94879
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Window.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.window.api
+
+import java.time.Duration
+
+/**
+ *
+ * @param windowFn
+ * @param trigger
+ * @param accumulationMode
+ */
+case class Window(
+    windowFn: WindowFn,
+    trigger: Trigger = EventTimeTrigger,
+    accumulationMode: AccumulationMode = Discarding) {
+
+  def triggering(trigger: Trigger): Window = {
+    Window(windowFn, trigger)
+  }
+
+  def accumulating: Window = {
+    Window(windowFn, trigger, Accumulating)
+  }
+
+  def discarding: Window = {
+    Window(windowFn, trigger, Discarding)
+  }
+}
+
+object CountWindow {
+
+  def apply(size: Int): Window = {
+    Window(CountWindowFn(size), CountTrigger)
+  }
+}
+
+object FixedWindow {
+
+  /**
+   * Defines a FixedWindow.
+   * @param size window size
+   * @return a Window definition
+   */
+  def apply(size: Duration): Window = {
+    Window(SlidingWindowFn(size, size))
+  }
+}
+
+object SlidingWindow {
+
+  /**
+   * Defines a SlidingWindow
+   * @param size window size
+   * @param step window step to slide forward
+   * @return a Window definition
+   */
+  def apply(size: Duration, step: Duration): Window = {
+    Window(SlidingWindowFn(size, step))
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFn.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFn.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFn.scala
new file mode 100644
index 0000000..0768730
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFn.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.window.api
+
+import java.time.{Duration, Instant}
+
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.streaming.dsl.window.impl.Bucket
+
+import scala.collection.mutable.ArrayBuffer
+
+sealed trait WindowFn {
+  def apply(timestamp: Instant): List[Bucket]
+}
+
+case class SlidingWindowFn(size: Duration, step: Duration)
+  extends WindowFn {
+
+  def this(size: Duration) = {
+    this(size, size)
+  }
+
+  override def apply(timestamp: Instant): List[Bucket] = {
+    val sizeMillis = size.toMillis
+    val stepMillis = step.toMillis
+    val timeMillis = timestamp.toEpochMilli
+    val windows = ArrayBuffer.empty[Bucket]
+    var start = lastStartFor(timeMillis, stepMillis)
+    windows += Bucket.ofEpochMilli(start, start + sizeMillis)
+    start -= stepMillis
+    while (start >= timeMillis) {
+      windows += Bucket.ofEpochMilli(start, start + sizeMillis)
+      start -= stepMillis
+    }
+    windows.toList
+  }
+
+  private def lastStartFor(timestamp: TimeStamp, windowStep: Long): TimeStamp = {
+    timestamp - (timestamp + windowStep) % windowStep
+  }
+}
+
+case class CountWindowFn(size: Int) extends WindowFn {
+
+  override def apply(timestamp: Instant): List[Bucket] = {
+    List(Bucket.ofEpochMilli(0, size))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala
new file mode 100644
index 0000000..e978983
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.window.impl
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.streaming.dsl.window.api.Trigger
+
+trait ReduceFnRunner {
+
+  def process(message: Message): Unit
+
+  def onTrigger(trigger: Trigger): Unit
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
new file mode 100644
index 0000000..53cf5d0
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.window.impl
+
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.Processor
+import org.apache.gearpump.{Message, TimeStamp}
+import org.apache.gearpump.streaming.dsl.window.api._
+import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, EventTimeTriggerTask, ProcessingTimeTriggerTask}
+import org.apache.gearpump.streaming.task.Task
+
+object Bucket {
+  def ofEpochMilli(startTime: TimeStamp, endTime: TimeStamp): Bucket = {
+    Bucket(Instant.ofEpochMilli(startTime), Instant.ofEpochMilli(endTime))
+  }
+}
+
+/**
+ * A window unit including startTime and excluding endTime.
+ */
+case class Bucket(startTime: Instant, endTime: Instant) extends Comparable[Bucket] {
+  override def compareTo(o: Bucket): Int = {
+    val ret = startTime.compareTo(o.startTime)
+    if (ret != 0) {
+      ret
+    } else {
+      endTime.compareTo(o.endTime)
+    }
+  }
+}
+
+case class GroupAlsoByWindow[T, GROUP](groupByFn: T => GROUP, window: Window)
+  extends GroupByFn[T, (GROUP, List[Bucket])] {
+
+  override def groupBy(message: Message): (GROUP, List[Bucket]) = {
+    val group = groupByFn(message.msg.asInstanceOf[T])
+    val buckets = window.windowFn(Instant.ofEpochMilli(message.timestamp))
+    group -> buckets
+  }
+
+  override def getProcessor(parallelism: Int, description: String,
+      userConfig: UserConfig)(implicit system: ActorSystem): Processor[_ <: Task] = {
+    val config = userConfig.withValue(GEARPUMP_STREAMING_GROUPBY_FUNCTION, this)
+    window.trigger match {
+      case CountTrigger =>
+        Processor[CountTriggerTask[T, GROUP]](parallelism, description, config)
+      case ProcessingTimeTrigger =>
+        Processor[ProcessingTimeTriggerTask[T, GROUP]](parallelism, description, config)
+      case EventTimeTrigger =>
+        Processor[EventTimeTriggerTask[T, GROUP]](parallelism, description, config)
+    }
+  }
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
new file mode 100644
index 0000000..9af5e61
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.window.impl
+
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.gs.collections.api.block.procedure.Procedure
+import org.apache.gearpump.gs.collections.impl.list.mutable.FastList
+import org.apache.gearpump.gs.collections.impl.map.mutable.UnifiedMap
+import org.apache.gearpump.gs.collections.impl.map.sorted.mutable.TreeSortedMap
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.dsl.plan.functions.{EmitFunction, SingleInputFunction}
+import org.apache.gearpump.streaming.dsl.window.api.Discarding
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.util.LogUtil
+import org.slf4j.Logger
+
+trait WindowRunner {
+
+  def process(message: Message): Unit
+
+  def trigger(time: Instant): Unit
+
+}
+
+object DefaultWindowRunner {
+
+  private val LOG: Logger = LogUtil.getLogger(classOf[DefaultWindowRunner[_, _, _]])
+
+  case class WindowGroup[GROUP](bucket: Bucket, group: GROUP)
+    extends Comparable[WindowGroup[GROUP]] {
+    override def compareTo(o: WindowGroup[GROUP]): Int = {
+      val ret = bucket.compareTo(o.bucket)
+      if (ret != 0) {
+        ret
+      } else if (group.equals(o.group)) {
+        0
+      } else {
+        -1
+      }
+    }
+  }
+}
+
+class DefaultWindowRunner[IN, GROUP, OUT](
+    taskContext: TaskContext, userConfig: UserConfig,
+    groupBy: GroupAlsoByWindow[IN, GROUP])(implicit system: ActorSystem)
+  extends WindowRunner {
+  import org.apache.gearpump.streaming.dsl.window.impl.DefaultWindowRunner._
+
+  private val windowGroups = new TreeSortedMap[WindowGroup[GROUP], FastList[IN]]
+  private val groupFns = new UnifiedMap[GROUP, SingleInputFunction[IN, OUT]]
+
+
+  override def process(message: Message): Unit = {
+    val (group, buckets) = groupBy.groupBy(message)
+    buckets.foreach { bucket =>
+      val wg = WindowGroup(bucket, group)
+      val inputs = windowGroups.getOrDefault(wg, new FastList[IN](1))
+      inputs.add(message.msg.asInstanceOf[IN])
+      windowGroups.put(wg, inputs)
+    }
+    groupFns.putIfAbsent(group,
+      userConfig.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get)
+  }
+
+  override def trigger(time: Instant): Unit = {
+    onTrigger()
+
+    @annotation.tailrec
+    def onTrigger(): Unit = {
+      if (windowGroups.notEmpty()) {
+        val first = windowGroups.firstKey
+        if (!time.isBefore(first.bucket.endTime)) {
+          val inputs = windowGroups.remove(first)
+          val reduceFn = groupFns.get(first.group)
+            .andThen[Unit](new EmitFunction[OUT](emitResult(_, time)))
+          inputs.forEach(new Procedure[IN] {
+            override def value(t: IN): Unit = {
+              reduceFn.process(t)
+            }
+          })
+          reduceFn.finish()
+          if (groupBy.window.accumulationMode == Discarding) {
+            reduceFn.clearState()
+          }
+          onTrigger()
+        }
+      }
+    }
+
+    def emitResult(result: OUT, time: Instant): Unit = {
+      taskContext.output(Message(result, time.toEpochMilli))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
index fb2d898..535497c 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
@@ -23,7 +23,7 @@ import java.time.Instant
 import org.apache.gearpump._
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.dsl.plan.OpTranslator.{DummyInputFunction, SingleInputFunction}
+import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
 /**
@@ -57,15 +57,10 @@ class DataSourceTask[IN, OUT] private[source](
   private val processMessage: Message => Unit =
     operator match {
       case Some(op) =>
-        op match {
-          case bad: DummyInputFunction[IN] =>
-            (message: Message) => context.output(message)
-          case _ =>
-            (message: Message) => {
-              op.process(message.msg.asInstanceOf[IN]).foreach { m: OUT =>
-                context.output(Message(m, message.timestamp))
-              }
-            }
+        (message: Message) => {
+          op.process(message.msg.asInstanceOf[IN]).foreach { m: OUT =>
+            context.output(Message(m, message.timestamp))
+          }
         }
       case None =>
         (message: Message) => context.output(message)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
index eb52700..f72e5b8 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
@@ -60,7 +60,7 @@ class TaskActor(
   val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = executorId, task = taskId)
 
   // Metrics
-  private val metricName = s"app$appId.processor${taskId.processorId}.task${taskId.index}"
+  private val metricName = s"app${appId}.processor${taskId.processorId}.task${taskId.index}"
   private val receiveLatency = Metrics(context.system).histogram(
     s"$metricName:receiveLatency", sampleRate = 1)
   private val processTime = Metrics(context.system).histogram(s"$metricName:processTime")
@@ -307,9 +307,9 @@ class TaskActor(
 
   private def updateUpstreamMinClock(upstreamClock: TimeStamp): Unit = {
     if (upstreamClock > this.upstreamMinClock) {
+      this.upstreamMinClock = upstreamClock
       task.onWatermarkProgress(Instant.ofEpochMilli(this.upstreamMinClock))
     }
-    this.upstreamMinClock = upstreamClock
 
     val subMinClock = subscriptions.foldLeft(Long.MaxValue) { (min, sub) =>
       val subMin = sub._2.minClock

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
index e919a34..e0407ec 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
@@ -21,7 +21,10 @@ package org.apache.gearpump.streaming.dsl
 import akka.actor.ActorSystem
 import org.apache.gearpump.cluster.TestUtil
 import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.partitioner.PartitionerDescription
+import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication}
 import org.apache.gearpump.streaming.source.DataSourceTask
+import org.apache.gearpump.util.Graph
 import org.mockito.Mockito.when
 import org.scalatest._
 import org.scalatest.mock.MockitoSugar
@@ -30,7 +33,7 @@ import scala.concurrent.Await
 import scala.concurrent.duration.Duration
 class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar {
 
-  implicit var system: ActorSystem = null
+  implicit var system: ActorSystem = _
 
   override def beforeAll(): Unit = {
     system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
@@ -45,49 +48,25 @@ class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with M
     val context: ClientContext = mock[ClientContext]
     when(context.system).thenReturn(system)
 
-    val app = StreamApp("dsl", context)
-    app.source(List("A"), 1, "")
-    app.source(List("B"), 1, "")
+    val dsl = StreamApp("dsl", context)
+    dsl.source(List("A"), 2, "A") shouldBe a [Stream[_]]
+    dsl.source(List("B"), 3, "B") shouldBe a [Stream[_]]
 
-    assert(app.graph.vertices.size == 2)
-  }
-
-  it should "plan the dsl to Processsor(TaskDescription) DAG" in {
-    val context: ClientContext = mock[ClientContext]
-    when(context.system).thenReturn(system)
-
-    val app = StreamApp("dsl", context)
-    val parallism = 3
-    app.source(List("A", "B", "C"), parallism, "").flatMap(Array(_)).reduce(_ + _)
-    val task = app.plan.dag.vertices.iterator.next()
-    assert(task.taskClass == classOf[DataSourceTask[_, _]].getName)
-    assert(task.parallelism == parallism)
-  }
-
-  it should "produce 3 messages" in {
-    val context: ClientContext = mock[ClientContext]
-    when(context.system).thenReturn(system)
-    val app = StreamApp("dsl", context)
-    val list = List[String](
-      "0",
-      "1",
-      "2"
-    )
-    val producer = app.source(list, 1, "producer").flatMap(Array(_)).reduce(_ + _)
-    val task = app.plan.dag.vertices.iterator.next()
-      /*
-      val task = app.plan.dag.vertices.iterator.map(desc => {
-        LOG.info(s"${desc.taskClass}")
-      })
-      val sum = producer.flatMap(msg => {
-        LOG.info("in flatMap")
-        assert(msg.msg.isInstanceOf[String])
-        val num = msg.msg.asInstanceOf[String].toInt
-        Array(num)
-      }).reduce(_+_)
-      val task = app.plan.dag.vertices.iterator.map(desc => {
-        LOG.info(s"${desc.taskClass}")
-      })
-     */
+    val application = dsl.plan()
+    application shouldBe a [StreamApplication]
+    application.name shouldBe "dsl"
+    val dag = application.userConfig
+      .getValue[Graph[ProcessorDescription, PartitionerDescription]](StreamApplication.DAG).get
+    dag.vertices.size shouldBe 2
+    dag.vertices.foreach { processor =>
+      processor.taskClass shouldBe classOf[DataSourceTask[_, _]].getName
+      if (processor.description == "A") {
+        processor.parallelism shouldBe 2
+      } else if (processor.description == "B") {
+        processor.parallelism shouldBe 3
+      } else {
+        fail(s"undefined source ${processor.description}")
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
index 816feef..fdc721b 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
@@ -22,10 +22,11 @@ import akka.actor._
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.{TestUtil, UserConfig}
-import org.apache.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner}
+import org.apache.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner, PartitionerDescription}
+import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication}
 import org.apache.gearpump.streaming.dsl.StreamSpec.Join
 import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
-import org.apache.gearpump.streaming.dsl.plan.OpTranslator._
+import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask}
 import org.apache.gearpump.streaming.source.DataSourceTask
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
 import org.apache.gearpump.util.Graph
@@ -40,7 +41,6 @@ import scala.util.{Either, Left, Right}
 
 class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar {
 
-
   implicit var system: ActorSystem = _
 
   override def beforeAll(): Unit = {
@@ -56,7 +56,7 @@ class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Mock
     val context: ClientContext = mock[ClientContext]
     when(context.system).thenReturn(system)
 
-    val app = StreamApp("dsl", context)
+    val dsl = StreamApp("dsl", context)
 
     val data =
       """
@@ -66,30 +66,32 @@ class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Mock
         five  four
         five
       """
-    val stream = app.source(data.lines.toList, 1, "").
+    val stream = dsl.source(data.lines.toList, 1, "").
       flatMap(line => line.split("[\\s]+")).filter(_.nonEmpty).
       map(word => (word, 1)).
       groupBy(_._1, parallelism = 2).
       reduce((left, right) => (left._1, left._2 + right._2)).
       map[Either[(String, Int), String]](Left(_))
 
-    val query = app.source(List("two"), 1, "").map[Either[(String, Int), String]](Right(_))
+    val query = dsl.source(List("two"), 1, "").map[Either[(String, Int), String]](Right(_))
     stream.merge(query).process[(String, Int)](classOf[Join], 1)
 
-    val appDescription = app.plan()
+    val app: StreamApplication = dsl.plan()
+    val dag = app.userConfig
+      .getValue[Graph[ProcessorDescription, PartitionerDescription]](StreamApplication.DAG).get
 
-    val dagTopology = appDescription.dag.mapVertex(_.taskClass).mapEdge { (node1, edge, node2) =>
+    val dagTopology = dag.mapVertex(_.taskClass).mapEdge { (node1, edge, node2) =>
       edge.partitionerFactory.partitioner.getClass.getName
     }
     val expectedDagTopology = getExpectedDagTopology
 
-    assert(dagTopology.vertices.toSet.equals(expectedDagTopology.vertices.toSet))
-    assert(dagTopology.edges.toSet.equals(expectedDagTopology.edges.toSet))
+    dagTopology.vertices.toSet should contain theSameElementsAs expectedDagTopology.vertices.toSet
+    dagTopology.edges.toSet should contain theSameElementsAs expectedDagTopology.edges.toSet
   }
 
   private def getExpectedDagTopology: Graph[String, String] = {
     val source = classOf[DataSourceTask[_, _]].getName
-    val group = classOf[GroupByTask[_, _, _]].getName
+    val group = classOf[CountTriggerTask[_, _]].getName
     val merge = classOf[TransformTask[_, _]].getName
     val join = classOf[Join].getName
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
index fcc646d..f49eb04 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
@@ -18,24 +18,33 @@
 
 package org.apache.gearpump.streaming.dsl.partitioner
 
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+import java.time.Duration
 
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
 import org.apache.gearpump.Message
 import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitionerSpec.People
+import org.apache.gearpump.streaming.dsl.window.api.{FixedWindow, GroupByFn}
+import org.apache.gearpump.streaming.dsl.window.impl.{Bucket, GroupAlsoByWindow}
 
 class GroupByPartitionerSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
-  it should "use the outpout of groupBy function to do partition" in {
+
+  it should "group by message payload and window" in {
     val mark = People("Mark", "male")
     val tom = People("Tom", "male")
     val michelle = People("Michelle", "female")
 
     val partitionNum = 10
-    val groupBy = new GroupByPartitioner[People, String](_.gender)
-    assert(groupBy.getPartition(Message(mark), partitionNum)
-      == groupBy.getPartition(Message(tom), partitionNum))
+    val groupByFn: GroupByFn[People, (String, List[Bucket])] =
+      GroupAlsoByWindow[People, String](_.gender, FixedWindow.apply(Duration.ofMillis(5)))
+    val groupBy = new GroupByPartitioner[People, (String, List[Bucket])](groupByFn)
+    groupBy.getPartition(Message(mark, 1L), partitionNum) shouldBe
+      groupBy.getPartition(Message(tom, 2L), partitionNum)
+
+    groupBy.getPartition(Message(mark, 1L), partitionNum) should not be
+      groupBy.getPartition(Message(tom, 6L), partitionNum)
 
-    assert(groupBy.getPartition(Message(mark), partitionNum)
-      != groupBy.getPartition(Message(michelle), partitionNum))
+    groupBy.getPartition(Message(mark, 2L), partitionNum) should not be
+      groupBy.getPartition(Message(michelle, 3L), partitionNum)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
new file mode 100644
index 0000000..bf52abc
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.plan
+
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.{TestUtil, UserConfig}
+import org.apache.gearpump.streaming.Processor
+import org.apache.gearpump.streaming.Processor.DefaultProcessor
+import org.apache.gearpump.streaming.dsl.plan.OpSpec.{AnySink, AnySource, AnyTask}
+import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
+import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec}
+import org.scalatest.mock.MockitoSugar
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoSugar {
+
+  private val unchainableOps: List[Op] = List(
+    mock[DataSourceOp],
+    mock[DataSinkOp],
+    mock[GroupByOp[Any, Any]],
+    mock[MergeOp],
+    mock[ProcessorOp[AnyTask]])
+
+  implicit var system: ActorSystem = _
+
+  override def beforeAll(): Unit = {
+    system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
+  }
+
+  override def afterAll(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+
+  "DataSourceOp" should {
+
+    "chain ChainableOp" in {
+      val dataSource = new AnySource
+      val dataSourceOp = DataSourceOp(dataSource)
+      val chainableOp = mock[ChainableOp[Any, Any]]
+      val fn = mock[SingleInputFunction[Any, Any]]
+
+      val chainedOp = dataSourceOp.chain(chainableOp)
+
+      chainedOp shouldBe a[DataSourceOp]
+      verify(chainableOp).fn
+
+      unchainableOps.foreach { op =>
+        intercept[OpChainException] {
+          dataSourceOp.chain(op)
+        }
+      }
+    }
+
+    "get Processor of DataSource" in {
+      val dataSource = new AnySource
+      val dataSourceOp = DataSourceOp(dataSource)
+      val processor = dataSourceOp.getProcessor
+      processor shouldBe a[Processor[_]]
+      processor.parallelism shouldBe dataSourceOp.parallelism
+      processor.description shouldBe dataSourceOp.description
+    }
+  }
+
+  "DataSinkOp" should {
+
+    "not chain any Op" in {
+      val dataSink = new AnySink
+      val dataSinkOp = DataSinkOp(dataSink)
+      val chainableOp = mock[ChainableOp[Any, Any]]
+      val ops = chainableOp +: unchainableOps
+      ops.foreach { op =>
+        intercept[OpChainException] {
+          dataSinkOp.chain(op)
+        }
+      }
+    }
+
+    "get Processor of DataSink" in {
+      val dataSink = new AnySink
+      val dataSinkOp = DataSinkOp(dataSink)
+      val processor = dataSinkOp.getProcessor
+      processor shouldBe a[Processor[_]]
+      processor.parallelism shouldBe dataSinkOp.parallelism
+      processor.description shouldBe dataSinkOp.description
+    }
+  }
+
+  "ProcessorOp" should {
+
+    "not chain any Op" in {
+      val processorOp = new ProcessorOp[AnyTask]
+      val chainableOp = mock[ChainableOp[Any, Any]]
+      val ops = chainableOp +: unchainableOps
+      ops.foreach { op =>
+        intercept[OpChainException] {
+          processorOp.chain(op)
+        }
+      }
+    }
+
+    "get Processor" in {
+      val processorOp = new ProcessorOp[AnyTask]
+      val processor = processorOp.getProcessor
+      processor shouldBe a [DefaultProcessor[_]]
+      processor.parallelism shouldBe processorOp.parallelism
+      processor.description shouldBe processorOp.description
+    }
+  }
+
+  "ChainableOp" should {
+
+    "chain ChainableOp" in {
+      val fn1 = mock[SingleInputFunction[Any, Any]]
+      val chainableOp1 = ChainableOp[Any, Any](fn1)
+
+      val fn2 = mock[SingleInputFunction[Any, Any]]
+      val chainableOp2 = ChainableOp[Any, Any](fn2)
+
+      val chainedOp = chainableOp1.chain(chainableOp2)
+
+      verify(fn1).andThen(fn2)
+      chainedOp shouldBe a[ChainableOp[_, _]]
+
+      unchainableOps.foreach { op =>
+        intercept[OpChainException] {
+          chainableOp1.chain(op)
+        }
+      }
+    }
+
+    "throw exception on getProcessor" in {
+      val fn1 = mock[SingleInputFunction[Any, Any]]
+      val chainableOp1 = ChainableOp[Any, Any](fn1)
+      intercept[UnsupportedOperationException] {
+        chainableOp1.getProcessor
+      }
+    }
+  }
+
+  "GroupByOp" should {
+
+    "chain ChainableOp" in {
+      val groupByFn = mock[GroupByFn[Any, Any]]
+      val groupByOp = GroupByOp[Any, Any](groupByFn)
+      val fn = mock[SingleInputFunction[Any, Any]]
+      val chainableOp = mock[ChainableOp[Any, Any]]
+      when(chainableOp.fn).thenReturn(fn)
+
+      val chainedOp = groupByOp.chain(chainableOp)
+      chainedOp shouldBe a[GroupByOp[_, _]]
+
+      unchainableOps.foreach { op =>
+        intercept[OpChainException] {
+          groupByOp.chain(op)
+        }
+      }
+    }
+
+    "delegate to groupByFn on getProcessor" in {
+      val groupByFn = mock[GroupByFn[Any, Any]]
+      val groupByOp = GroupByOp[Any, Any](groupByFn)
+
+      groupByOp.getProcessor
+      verify(groupByFn).getProcessor(anyInt, anyString, any[UserConfig])(any[ActorSystem])
+    }
+  }
+
+  "MergeOp" should {
+
+    val mergeOp = MergeOp("merge")
+
+    "chain ChainableOp" in {
+      val fn = mock[SingleInputFunction[Any, Any]]
+      val chainableOp = mock[ChainableOp[Any, Any]]
+      when(chainableOp.fn).thenReturn(fn)
+
+      val chainedOp = mergeOp.chain(chainableOp)
+      chainedOp shouldBe a [MergeOp]
+
+      unchainableOps.foreach { op =>
+        intercept[OpChainException] {
+          mergeOp.chain(op)
+        }
+      }
+    }
+
+    "get Processor" in {
+      val processor = mergeOp.getProcessor
+      processor shouldBe a[Processor[_]]
+      processor.parallelism shouldBe 1
+    }
+  }
+}
+
+object OpSpec {
+  class AnyTask(context: TaskContext, config: UserConfig) extends Task(context, config)
+
+  class AnySource extends DataSource {
+
+    override def open(context: TaskContext, startTime: Instant): Unit = {}
+
+    override def read(): Message = Message("any")
+
+    override def close(): Unit = {}
+
+    override def getWatermark: Instant = Instant.now()
+  }
+
+  class AnySink extends DataSink {
+
+    override def open(context: TaskContext): Unit = {}
+
+    override def write(message: Message): Unit = {}
+
+    override def close(): Unit = {}
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
deleted file mode 100644
index 2112fd0..0000000
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.dsl.plan
-
-import java.time.Instant
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-import akka.actor.ActorSystem
-import org.mockito.ArgumentCaptor
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalatest._
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.{TestUtil, UserConfig}
-import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.dsl.CollectionDataSource
-import org.apache.gearpump.streaming.dsl.plan.OpTranslator._
-import org.apache.gearpump.streaming.source.DataSourceTask
-
-class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
-
-
-  "andThen" should "chain multiple single input function" in {
-    val dummy = new DummyInputFunction[String]
-    val split = new FlatMapFunction[String, String](line => line.split("\\s"), "split")
-
-    val filter = new FlatMapFunction[String, String](word =>
-      if (word.isEmpty) None else Some(word), "filter")
-
-    val map = new FlatMapFunction[String, Int](word => Some(1), "map")
-
-    val sum = new ReduceFunction[Int]({ (left, right) => left + right }, "sum")
-
-    val all = dummy.andThen(split).andThen(filter).andThen(map).andThen(sum)
-
-    assert(all.description == "split.filter.map.sum")
-
-    val data =
-      """
-      five  four three  two    one
-      five  four three  two
-      five  four three
-      five  four
-      five
-      """
-    val count = all.process(data).toList.last
-    assert(count == 15)
-  }
-
-  "Source" should "iterate over input source and apply attached operator" in {
-
-    val taskContext = MockUtil.mockTaskContext
-    implicit val actorSystem = MockUtil.system
-
-    val data = "one two three".split("\\s")
-    val dataSource = new CollectionDataSource[String](data)
-    val conf = UserConfig.empty.withValue(GEARPUMP_STREAMING_SOURCE, dataSource)
-
-    // Source with no transformer
-    val source = new DataSourceTask[String, String](
-      taskContext, conf)
-    source.onStart(Instant.EPOCH)
-    source.onNext(Message("next"))
-    data.foreach { s =>
-      verify(taskContext, times(1)).output(Message(s))
-    }
-
-    // Source with transformer
-    val anotherTaskContext = MockUtil.mockTaskContext
-    val double = new FlatMapFunction[String, String](word => List(word, word), "double")
-    val another = new DataSourceTask(anotherTaskContext,
-      conf.withValue(GEARPUMP_STREAMING_OPERATOR, double))
-    another.onStart(Instant.EPOCH)
-    another.onNext(Message("next"))
-    data.foreach { s =>
-      verify(anotherTaskContext, times(2)).output(Message(s))
-    }
-  }
-
-  "GroupByTask" should "group input by groupBy Function and " +
-    "apply attached operator for each group" in {
-
-    val data = "1 2  2  3 3  3"
-
-    val concat = new ReduceFunction[String]({ (left, right) =>
-      left + right
-    }, "concat")
-
-    implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
-    val config = UserConfig.empty.withValue[SingleInputFunction[String, String]](
-    GEARPUMP_STREAMING_OPERATOR, concat)
-
-    val taskContext = MockUtil.mockTaskContext
-
-    val task = new GroupByTask[String, String, String](input => input, taskContext, config)
-    task.onStart(Instant.EPOCH)
-
-    val peopleCaptor = ArgumentCaptor.forClass(classOf[Message])
-
-    data.split("\\s+").foreach { word =>
-      task.onNext(Message(word))
-    }
-    verify(taskContext, times(6)).output(peopleCaptor.capture())
-
-    import scala.collection.JavaConverters._
-
-    val values = peopleCaptor.getAllValues.asScala.map(input => input.msg.asInstanceOf[String])
-    assert(values.mkString(",") == "1,2,22,3,33,333")
-    system.terminate()
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-
-  "MergeTask" should "accept two stream and apply the attached operator" in {
-
-    // Source with transformer
-    val taskContext = MockUtil.mockTaskContext
-    val conf = UserConfig.empty
-    val double = new FlatMapFunction[String, String](word => List(word, word), "double")
-    val task = new TransformTask[String, String](Some(double), taskContext, conf)
-    task.onStart(Instant.EPOCH)
-
-    val data = "1 2  2  3 3  3".split("\\s+")
-
-    data.foreach { input =>
-      task.onNext(Message(input))
-    }
-
-    verify(taskContext, times(data.length * 2)).output(anyObject())
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
new file mode 100644
index 0000000..f8666ba
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.plan
+
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.{TestUtil, UserConfig}
+import org.apache.gearpump.partitioner.CoLocationPartitioner
+import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
+import org.apache.gearpump.streaming.dsl.plan.PlannerSpec._
+import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapFunction, ReduceFunction}
+import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.{MockUtil, Processor}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+import org.apache.gearpump.util.Graph
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+class PlannerSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar {
+
+  implicit var system: ActorSystem = _
+
+  override def beforeAll(): Unit = {
+    system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
+  }
+
+  override def afterAll(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+
+  "Planner" should "chain operations" in {
+    val graph = Graph.empty[Op, OpEdge]
+    val sourceOp = DataSourceOp(new AnySource)
+    val groupByOp = GroupByOp(new AnyGroupByFn)
+    val flatMapOp = ChainableOp[Any, Any](anyFlatMapFunction)
+    val reduceOp = ChainableOp[Any, Any](anyReduceFunction)
+    val processorOp = new ProcessorOp[AnyTask]
+    val sinkOp = DataSinkOp(new AnySink)
+    val directEdge = Direct
+    val shuffleEdge = Shuffle
+
+    graph.addVertex(sourceOp)
+    graph.addVertex(groupByOp)
+    graph.addEdge(sourceOp, shuffleEdge, groupByOp)
+    graph.addVertex(flatMapOp)
+    graph.addEdge(groupByOp, directEdge, flatMapOp)
+    graph.addVertex(reduceOp)
+    graph.addEdge(flatMapOp, directEdge, reduceOp)
+    graph.addVertex(processorOp)
+    graph.addEdge(reduceOp, directEdge, processorOp)
+    graph.addVertex(sinkOp)
+    graph.addEdge(processorOp, directEdge, sinkOp)
+
+    implicit val system = MockUtil.system
+
+    val planner = new Planner
+    val plan = planner.plan(graph)
+      .mapVertex(_.description)
+
+    plan.vertices.toSet should contain theSameElementsAs
+      Set("source", "groupBy", "processor", "sink")
+    plan.outgoingEdgesOf("source").iterator.next()._2 shouldBe a[GroupByPartitioner[_, _]]
+    plan.outgoingEdgesOf("groupBy").iterator.next()._2 shouldBe a[CoLocationPartitioner]
+    plan.outgoingEdgesOf("processor").iterator.next()._2 shouldBe a[CoLocationPartitioner]
+  }
+}
+
+object PlannerSpec {
+
+  private val anyParallelism = 1
+  private val anyFlatMapFunction = new FlatMapFunction[Any, Any](Option(_), "flatMap")
+  private val anyReduceFunction = new ReduceFunction[Any](
+    (left: Any, right: Any) => (left, right), "reduce")
+
+  class AnyTask(context: TaskContext, config: UserConfig) extends Task(context, config)
+
+  class AnySource extends DataSource {
+
+    override def open(context: TaskContext, startTime: Instant): Unit = {}
+
+    override def read(): Message = Message("any")
+
+    override def close(): Unit = {}
+
+    override def getWatermark: Instant = Instant.now()
+  }
+
+  class AnySink extends DataSink {
+
+    override def open(context: TaskContext): Unit = {}
+
+    override def write(message: Message): Unit = {}
+
+    override def close(): Unit = {}
+  }
+
+  class AnyGroupByFn extends GroupByFn[Any, Any] {
+
+    override def groupBy(message: Message): Any = message.msg
+
+    override def getProcessor(
+        parallelism: Int,
+        description: String,
+        userConfig: UserConfig)(implicit system: ActorSystem): Processor[_ <: Task] = {
+      Processor[AnyTask](anyParallelism, description)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala
new file mode 100644
index 0000000..94feae4
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.plan.functions
+
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.{TestUtil, UserConfig}
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.dsl.CollectionDataSource
+import org.apache.gearpump.streaming.source.DataSourceTask
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask}
+import org.apache.gearpump.streaming.dsl.window.api.CountWindow
+import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
+import org.mockito.ArgumentCaptor
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.{Matchers, WordSpec}
+import org.scalatest.mock.MockitoSugar
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+class SingleInputFunctionSpec extends WordSpec with Matchers with MockitoSugar {
+  import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunctionSpec._
+
+  "AndThen" should {
+
+    val first = mock[SingleInputFunction[R, S]]
+    val second = mock[SingleInputFunction[S, T]]
+    val andThen = new AndThen(first, second)
+
+    "chain first and second functions when processing input value" in {
+      val input = mock[R]
+      val firstOutput = mock[S]
+      val secondOutput = mock[T]
+      when(first.process(input)).thenReturn(Some(firstOutput))
+      when(second.process(firstOutput)).thenReturn(Some(secondOutput))
+
+      andThen.process(input).toList shouldBe List(secondOutput)
+    }
+
+    "return chained description" in {
+      when(first.description).thenReturn("first")
+      when(second.description).thenReturn("second")
+      andThen.description shouldBe "first.second"
+    }
+
+    "return either first result or second on finish" in {
+      val firstResult = mock[S]
+      val processedFirst = mock[T]
+      val secondResult = mock[T]
+
+      when(first.finish()).thenReturn(Some(firstResult))
+      when(second.process(firstResult)).thenReturn(Some(processedFirst))
+      andThen.finish().toList shouldBe List(processedFirst)
+
+      when(first.finish()).thenReturn(None)
+      when(second.finish()).thenReturn(Some(secondResult))
+      andThen.finish().toList shouldBe List(secondResult)
+    }
+
+    "clear both states on clearState" in {
+      andThen.clearState()
+
+      verify(first).clearState()
+      verify(second).clearState()
+    }
+
+    "return AndThen on andThen" in {
+      val third = mock[SingleInputFunction[T, Any]]
+      andThen.andThen[Any](third) shouldBe an [AndThen[_, _, _]]
+    }
+  }
+
+  "FlatMapFunction" should {
+
+    val flatMap = mock[R => TraversableOnce[S]]
+    val flatMapFunction = new FlatMapFunction[R, S](flatMap, "flatMap")
+
+    "call flatMap function when processing input value" in {
+      val input = mock[R]
+      flatMapFunction.process(input)
+      verify(flatMap).apply(input)
+    }
+
+    "return passed in description" in {
+      flatMapFunction.description shouldBe "flatMap"
+    }
+
+    "return None on finish" in {
+      flatMapFunction.finish() shouldBe List.empty[S]
+    }
+
+    "do nothing on clearState" in {
+      flatMapFunction.clearState()
+      verifyZeroInteractions(flatMap)
+    }
+
+    "return AndThen on andThen" in {
+      val other = mock[SingleInputFunction[S, T]]
+      flatMapFunction.andThen[T](other) shouldBe an [AndThen[_, _, _]]
+    }
+  }
+
+  "ReduceFunction" should {
+
+
+    "call reduce function when processing input value" in {
+      val reduce = mock[(T, T) => T]
+      val reduceFunction = new ReduceFunction[T](reduce, "reduce")
+      val input1 = mock[T]
+      val input2 = mock[T]
+      val output = mock[T]
+
+      when(reduce.apply(input1, input2)).thenReturn(output, output)
+
+      reduceFunction.process(input1) shouldBe List.empty[T]
+      reduceFunction.process(input2) shouldBe List.empty[T]
+      reduceFunction.finish() shouldBe List(output)
+
+      reduceFunction.clearState()
+      reduceFunction.process(input1) shouldBe List.empty[T]
+      reduceFunction.clearState()
+      reduceFunction.process(input2) shouldBe List.empty[T]
+      reduceFunction.finish() shouldBe List(input2)
+    }
+
+    "return passed in description" in {
+      val reduce = mock[(T, T) => T]
+      val reduceFunction = new ReduceFunction[T](reduce, "reduce")
+      reduceFunction.description shouldBe "reduce"
+    }
+
+    "return None on finish" in {
+      val reduce = mock[(T, T) => T]
+      val reduceFunction = new ReduceFunction[T](reduce, "reduce")
+      reduceFunction.finish() shouldBe List.empty[T]
+    }
+
+    "do nothing on clearState" in {
+      val reduce = mock[(T, T) => T]
+      val reduceFunction = new ReduceFunction[T](reduce, "reduce")
+      reduceFunction.clearState()
+      verifyZeroInteractions(reduce)
+    }
+
+    "return AndThen on andThen" in {
+      val reduce = mock[(T, T) => T]
+      val reduceFunction = new ReduceFunction[T](reduce, "reduce")
+      val other = mock[SingleInputFunction[T, Any]]
+      reduceFunction.andThen[Any](other) shouldBe an[AndThen[_, _, _]]
+    }
+  }
+
+  "EmitFunction" should {
+
+    val emit = mock[T => Unit]
+    val emitFunction = new EmitFunction[T](emit)
+
+    "emit input value when processing input value" in {
+      val input = mock[T]
+
+      emitFunction.process(input) shouldBe List.empty[Unit]
+
+      verify(emit).apply(input)
+    }
+
+    "return empty description" in {
+      emitFunction.description shouldBe ""
+    }
+
+    "return None on finish" in {
+      emitFunction.finish() shouldBe List.empty[Unit]
+    }
+
+    "do nothing on clearState" in {
+      emitFunction.clearState()
+      verifyZeroInteractions(emit)
+    }
+
+    "throw exception on andThen" in {
+      val other = mock[SingleInputFunction[Unit, Any]]
+      intercept[UnsupportedOperationException] {
+        emitFunction.andThen(other)
+      }
+    }
+  }
+
+  "andThen" should {
+    "chain multiple single input function" in {
+      val split = new FlatMapFunction[String, String](line => line.split("\\s"), "split")
+
+      val filter = new FlatMapFunction[String, String](word =>
+        if (word.isEmpty) None else Some(word), "filter")
+
+      val map = new FlatMapFunction[String, Int](word => Some(1), "map")
+
+      val sum = new ReduceFunction[Int]({ (left, right) => left + right }, "sum")
+
+      val all = split.andThen(filter).andThen(map).andThen(sum)
+
+      assert(all.description == "split.filter.map.sum")
+
+      val data =
+        """
+      five  four three  two    one
+      five  four three  two
+      five  four three
+      five  four
+      five
+        """
+      // force eager evaluation
+      all.process(data).toList
+      val result = all.finish().toList
+      assert(result.nonEmpty)
+      assert(result.last == 15)
+    }
+  }
+
+  "Source" should {
+    "iterate over input source and apply attached operator" in {
+
+      val taskContext = MockUtil.mockTaskContext
+      implicit val actorSystem = MockUtil.system
+
+      val data = "one two three".split("\\s")
+      val dataSource = new CollectionDataSource[String](data)
+      val conf = UserConfig.empty.withValue(GEARPUMP_STREAMING_SOURCE, dataSource)
+
+      // Source with no transformer
+      val source = new DataSourceTask[String, String](
+        taskContext, conf)
+      source.onStart(Instant.EPOCH)
+      source.onNext(Message("next"))
+      data.foreach { s =>
+        verify(taskContext, times(1)).output(MockUtil.argMatch[Message](
+          message => message.msg == s))
+      }
+
+      // Source with transformer
+      val anotherTaskContext = MockUtil.mockTaskContext
+      val double = new FlatMapFunction[String, String](word => List(word, word), "double")
+      val another = new DataSourceTask(anotherTaskContext,
+        conf.withValue(GEARPUMP_STREAMING_OPERATOR, double))
+      another.onStart(Instant.EPOCH)
+      another.onNext(Message("next"))
+      data.foreach { s =>
+        verify(anotherTaskContext, times(2)).output(MockUtil.argMatch[Message](
+          message => message.msg == s))
+      }
+    }
+  }
+
+  "CountTriggerTask" should {
+    "group input by groupBy Function and " +
+      "apply attached operator for each group" in {
+
+      val data = "1 2  2  3 3  3"
+
+      val concat = new ReduceFunction[String]({ (left, right) =>
+        left + right
+      }, "concat")
+
+      implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
+      val config = UserConfig.empty.withValue[SingleInputFunction[String, String]](
+        GEARPUMP_STREAMING_OPERATOR, concat)
+
+      val taskContext = MockUtil.mockTaskContext
+
+      val groupBy = GroupAlsoByWindow((input: String) => input, CountWindow.apply(1).accumulating)
+      val task = new CountTriggerTask[String, String](groupBy, taskContext, config)
+      task.onStart(Instant.EPOCH)
+
+      val peopleCaptor = ArgumentCaptor.forClass(classOf[Message])
+
+      data.split("\\s+").foreach { word =>
+        task.onNext(Message(word))
+      }
+      verify(taskContext, times(6)).output(peopleCaptor.capture())
+
+      import scala.collection.JavaConverters._
+
+      val values = peopleCaptor.getAllValues.asScala.map(input => input.msg.asInstanceOf[String])
+      assert(values.mkString(",") == "1,2,22,3,33,333")
+      system.terminate()
+      Await.result(system.whenTerminated, Duration.Inf)
+    }
+  }
+
+  "MergeTask" should {
+    "accept two stream and apply the attached operator" in {
+
+      // Source with transformer
+      val taskContext = MockUtil.mockTaskContext
+      val conf = UserConfig.empty
+      val double = new FlatMapFunction[String, String](word => List(word, word), "double")
+      val task = new TransformTask[String, String](Some(double), taskContext, conf)
+      task.onStart(Instant.EPOCH)
+
+      val data = "1 2  2  3 3  3".split("\\s+")
+
+      data.foreach { input =>
+        task.onNext(Message(input))
+      }
+
+      verify(taskContext, times(data.length * 2)).output(anyObject())
+    }
+  }
+}
+
+object SingleInputFunctionSpec {
+  type R = AnyRef
+  type S = AnyRef
+  type T = AnyRef
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala
new file mode 100644
index 0000000..871d751
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.task
+
+import java.time.Instant
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.dsl.window.api.CountWindow
+import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, WindowRunner}
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+class CountTriggerTaskSpec extends PropSpec with PropertyChecks
+  with Matchers with MockitoSugar {
+
+  property("CountTriggerTask should trigger output by number of messages in a window") {
+
+    implicit val system = MockUtil.system
+
+    val numGen = Gen.chooseNum[Int](1, 1000)
+
+    forAll(numGen, numGen) { (windowSize: Int, msgNum: Int) =>
+
+      val groupBy = mock[GroupAlsoByWindow[Any, Any]]
+      val window = CountWindow.apply(windowSize)
+      when(groupBy.window).thenReturn(window)
+      val windowRunner = mock[WindowRunner]
+      val userConfig = UserConfig.empty
+
+      val task = new CountTriggerTask[Any, Any](groupBy, windowRunner,
+        MockUtil.mockTaskContext, userConfig)
+      val message = mock[Message]
+
+      for (i <- 1 to msgNum) {
+        task.onNext(message)
+      }
+      verify(windowRunner, times(msgNum)).process(message)
+      verify(windowRunner, times(msgNum / windowSize)).trigger(Instant.ofEpochMilli(windowSize))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala
new file mode 100644
index 0000000..a69abe6
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.task
+
+import java.time.{Duration, Instant}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, SlidingWindow}
+import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, WindowRunner}
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.{Matchers, PropSpec}
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+
+class EventTimeTriggerTaskSpec extends PropSpec with PropertyChecks
+  with Matchers with MockitoSugar {
+
+  property("EventTimeTriggerTask should trigger on watermark") {
+    val longGen = Gen.chooseNum[Long](1L, 1000L)
+    val windowSizeGen = longGen
+    val windowStepGen = longGen
+    val watermarkGen = longGen.map(Instant.ofEpochMilli)
+
+    forAll(windowSizeGen, windowStepGen, watermarkGen) {
+      (windowSize: Long, windowStep: Long, watermark: Instant) =>
+
+        val window = SlidingWindow.apply(Duration.ofMillis(windowSize),
+          Duration.ofMillis(windowStep)).triggering(EventTimeTrigger)
+        val groupBy = mock[GroupAlsoByWindow[Any, Any]]
+        val windowRunner = mock[WindowRunner]
+        val context = MockUtil.mockTaskContext
+        val config = UserConfig.empty
+
+        when(groupBy.window).thenReturn(window)
+
+        val task = new EventTimeTriggerTask[Any, Any](groupBy, windowRunner, context, config)
+
+        val message = mock[Message]
+        task.onNext(message)
+        verify(windowRunner).process(message)
+
+        task.onWatermarkProgress(watermark)
+        verify(windowRunner).trigger(any[Instant])
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala
new file mode 100644
index 0000000..39e1b4c
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.task
+
+import java.time.{Duration, Instant}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.dsl.task.ProcessingTimeTriggerTask.Triggering
+import org.apache.gearpump.streaming.dsl.window.api.{ProcessingTimeTrigger, SlidingWindow}
+import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, WindowRunner}
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.{Matchers, PropSpec}
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+
+class ProcessingTimeTriggerTaskSpec extends PropSpec with PropertyChecks
+  with Matchers with MockitoSugar {
+
+  property("ProcessingTimeTriggerTask should trigger on system time interval") {
+    val longGen = Gen.chooseNum[Long](1L, 1000L)
+    val windowSizeGen = longGen
+    val windowStepGen = longGen
+    val startTimeGen = longGen.map(Instant.ofEpochMilli)
+
+    forAll(windowSizeGen, windowStepGen, startTimeGen) {
+      (windowSize: Long, windowStep: Long, startTime: Instant) =>
+
+        val window = SlidingWindow.apply(Duration.ofMillis(windowSize),
+          Duration.ofMillis(windowStep)).triggering(ProcessingTimeTrigger)
+        val groupBy = mock[GroupAlsoByWindow[Any, Any]]
+        val windowRunner = mock[WindowRunner]
+        val context = MockUtil.mockTaskContext
+        val config = UserConfig.empty
+
+        when(groupBy.window).thenReturn(window)
+
+        val task = new ProcessingTimeTriggerTask[Any, Any](groupBy, windowRunner, context, config)
+
+        task.onStart(startTime)
+
+        val message = mock[Message]
+        task.onNext(message)
+        verify(windowRunner).process(message)
+
+        task.receiveUnManagedMessage(Triggering)
+        verify(windowRunner).trigger(any[Instant])
+    }
+  }
+
+}


[2/2] incubator-gearpump git commit: Merge branch 'master' into akka-streams

Posted by ma...@apache.org.
Merge branch 'master' into akka-streams

Author: manuzhang <ow...@gmail.com>
Author: darionyaphet <da...@gmail.com>

Closes #95 from manuzhang/akka-streams.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/bc394035
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/bc394035
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/bc394035

Branch: refs/heads/akka-streams
Commit: bc39403525b4065360acf82386fac21a588b59e6
Parents: 4fe5458
Author: manuzhang <ow...@gmail.com>
Authored: Tue Oct 11 11:57:48 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Tue Oct 11 11:57:48 2016 +0800

----------------------------------------------------------------------
 .../wordcount/dsl/WindowedWordCount.scala       |  87 ++++
 .../apache/gearpump/redis/RedisMessage.scala    | 456 +++++++++++++++++++
 .../org/apache/gearpump/redis/RedisSink.scala   | 119 +++++
 project/Build.scala                             |  62 ++-
 project/BuildShaded.scala                       | 127 +++---
 .../apache/gearpump/streaming/Constants.scala   |   1 +
 .../gearpump/streaming/StreamApplication.scala  |   2 +-
 .../apache/gearpump/streaming/dsl/Stream.scala  | 106 +++--
 .../gearpump/streaming/dsl/StreamApp.scala      |  34 +-
 .../streaming/dsl/javaapi/JavaStream.scala      |  22 +-
 .../apache/gearpump/streaming/dsl/op/OP.scala   | 109 -----
 .../dsl/partitioner/GroupByPartitioner.scala    |  49 ++
 .../dsl/partitioner/GroupbyPartitioner.scala    |  46 --
 .../apache/gearpump/streaming/dsl/plan/OP.scala | 214 +++++++++
 .../streaming/dsl/plan/OpTranslator.scala       | 222 ---------
 .../gearpump/streaming/dsl/plan/Planner.scala   |  65 ++-
 .../plan/functions/SingleInputFunction.scala    | 107 +++++
 .../streaming/dsl/task/CountTriggerTask.scala   |  63 +++
 .../dsl/task/EventTimeTriggerTask.scala         |  59 +++
 .../dsl/task/ProcessingTimeTriggerTask.scala    |  82 ++++
 .../streaming/dsl/task/TransformTask.scala      |  47 ++
 .../dsl/window/api/AccumulationMode.scala       |  24 +
 .../streaming/dsl/window/api/GroupByFn.scala    |  47 ++
 .../streaming/dsl/window/api/Trigger.scala      |  27 ++
 .../streaming/dsl/window/api/Window.scala       |  77 ++++
 .../streaming/dsl/window/api/WindowFn.scala     |  63 +++
 .../dsl/window/impl/ReduceFnRunner.scala        |  29 ++
 .../streaming/dsl/window/impl/Window.scala      |  75 +++
 .../dsl/window/impl/WindowRunner.scala          | 114 +++++
 .../streaming/source/DataSourceTask.scala       |  15 +-
 .../gearpump/streaming/task/TaskActor.scala     |   4 +-
 .../gearpump/streaming/dsl/StreamAppSpec.scala  |  67 +--
 .../gearpump/streaming/dsl/StreamSpec.scala     |  24 +-
 .../partitioner/GroupByPartitionerSpec.scala    |  23 +-
 .../gearpump/streaming/dsl/plan/OpSpec.scala    | 244 ++++++++++
 .../streaming/dsl/plan/OpTranslatorSpec.scala   | 148 ------
 .../streaming/dsl/plan/PlannerSpec.scala        | 132 ++++++
 .../functions/SingleInputFunctionSpec.scala     | 333 ++++++++++++++
 .../dsl/task/CountTriggerTaskSpec.scala         |  61 +++
 .../dsl/task/EventTimeTriggerTaskSpec.scala     |  66 +++
 .../task/ProcessingTimeTriggerTaskSpec.scala    |  69 +++
 41 files changed, 2937 insertions(+), 784 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
new file mode 100644
index 0000000..4f43fd4
--- /dev/null
+++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.wordcount.dsl
+
+import java.time.{Duration, Instant}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.streaming.dsl.{LoggerSink, StreamApp}
+import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, FixedWindow}
+import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.util.AkkaApp
+
+object WindowedWordCount extends AkkaApp with ArgumentsParser {
+
+  override val options: Array[(String, CLIOption[Any])] = Array.empty
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val context = ClientContext(akkaConf)
+    val app = StreamApp("dsl", context)
+    app.source[String](new TimedDataSource).
+      // word => (word, count)
+      flatMap(line => line.split("[\\s]+")).map((_, 1)).
+      // fix window
+      window(FixedWindow.apply(Duration.ofMillis(5L))
+        .triggering(EventTimeTrigger)).
+      // (word, count1), (word, count2) => (word, count1 + count2)
+      groupBy(_._1).
+      sum.sink(new LoggerSink)
+
+    context.submit(app)
+    context.close()
+  }
+
+  private class TimedDataSource extends DataSource {
+
+    private var data = List(
+      Message("foo", 1L),
+      Message("bar", 2L),
+      Message("foo", 3L),
+      Message("foo", 5L),
+      Message("bar", 7L),
+      Message("bar", 8L)
+    )
+
+    private var watermark: Instant = Instant.ofEpochMilli(0)
+
+    override def read(): Message = {
+      if (data.nonEmpty) {
+        val msg = data.head
+        data = data.tail
+        watermark = Instant.ofEpochMilli(msg.timestamp)
+        msg
+      } else {
+        null
+      }
+    }
+
+    override def open(context: TaskContext, startTime: Instant): Unit = {}
+
+    override def close(): Unit = {}
+
+    override def getWatermark: Instant = {
+      if (data.isEmpty) {
+        watermark = watermark.plusMillis(1)
+      }
+      watermark
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala
----------------------------------------------------------------------
diff --git a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala
new file mode 100644
index 0000000..84dec70
--- /dev/null
+++ b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.redis
+
+import java.nio.charset.Charset
+
+object RedisMessage {
+
+  private def toBytes(strings: List[String]): List[Array[Byte]] =
+    strings.map(string => string.getBytes(Charset.forName("UTF8")))
+
+  private def toBytes(string: String): Array[Byte] =
+    string.getBytes(Charset.forName("UTF8"))
+
+  object Connection {
+
+    /**
+     * Change the selected database for the current connection
+     *
+     * @param index
+     */
+    case class SELECT(index: Int)
+
+  }
+
+  object Geo {
+
+    /**
+     * Add one geospatial item in the geospatial index represented using a sorted set
+     *
+     * @param key
+     * @param longitude
+     * @param latitude
+     * @param member
+     */
+    case class GEOADD(key: Array[Byte], longitude: Double,
+                      latitude: Double, member: Array[Byte]) {
+      def this(key: String, longitude: Double,
+               latitude: Double, member: String) =
+        this(toBytes(key), longitude, latitude, toBytes(member))
+    }
+
+  }
+
+  object Hashes {
+
+    /**
+     * Delete a hash field
+     *
+     * @param key
+     * @param field
+     */
+    case class HDEL(key: Array[Byte], field: Array[Byte]) {
+      def this(key: String, field: String) = this(toBytes(key), toBytes(field))
+    }
+
+    /**
+     * Increment the integer value of a hash field by the given number
+     *
+     * @param key
+     * @param field
+     * @param increment
+     */
+    case class HINCRBY(key: Array[Byte], field: Array[Byte], increment: Long) {
+      def this(key: String, field: String, increment: Long) =
+        this(toBytes(key), toBytes(field), increment)
+    }
+
+    /**
+     * Increment the float value of a hash field by the given amount
+     *
+     * @param key
+     * @param field
+     * @param increment
+     */
+    case class HINCRBYFLOAT(key: Array[Byte], field: Array[Byte], increment: Float) {
+      def this(key: String, field: String, increment: Float) =
+        this(toBytes(key), toBytes(field), increment)
+    }
+
+
+    /**
+     * Set the string value of a hash field
+     *
+     * @param key
+     * @param field
+     * @param value
+     */
+    case class HSET(key: Array[Byte], field: Array[Byte], value: Array[Byte]) {
+      def this(key: String, field: String, value: String) =
+        this(toBytes(key), toBytes(field), toBytes(value))
+    }
+
+    /**
+     * Set the value of a hash field, only if the field does not exist
+     *
+     * @param key
+     * @param field
+     * @param value
+     */
+    case class HSETNX(key: Array[Byte], field: Array[Byte], value: Array[Byte]) {
+      def this(key: String, field: String, value: String) =
+        this(toBytes(key), toBytes(field), toBytes(value))
+    }
+
+  }
+
+  object HyperLogLog {
+
+    /**
+     * Adds the specified elements to the specified HyperLogLog
+     *
+     * @param key
+     * @param element
+     */
+    case class PFADD(key: String, element: String)
+
+  }
+
+  object Lists {
+
+
+    /**
+     * Prepend one or multiple values to a list
+     *
+     * @param key
+     * @param value
+     */
+    case class LPUSH(key: Array[Byte], value: Array[Byte]) {
+
+      def this(key: String, value: String) = this(key, toBytes(value))
+    }
+
+    /**
+     * Prepend a value to a list, only if the list exists
+     *
+     * @param key
+     * @param value
+     */
+    case class LPUSHX(key: Array[Byte], value: Array[Byte]) {
+      def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+    }
+
+    /**
+     * Set the value of an element in a list by its index
+     *
+     * @param key
+     * @param index
+     * @param value
+     */
+    case class LSET(key: Array[Byte], index: Long, value: Array[Byte]) {
+      def this(key: String, index: Long, value: String) = this(toBytes(key), index, toBytes(value))
+    }
+
+    /**
+     * Append one or multiple values to a list
+     *
+     * @param key
+     * @param value
+     */
+    case class RPUSH(key: Array[Byte], value: Array[Byte]) {
+
+      def this(key: String, value: String) = this(key, toBytes(value))
+    }
+
+    /**
+     * Append a value to a list, only if the list exists
+     *
+     * @param key
+     * @param value
+     */
+    case class RPUSHX(key: Array[Byte], value: Array[Byte]) {
+      def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+    }
+
+  }
+
+  object Keys {
+
+    /**
+     * Delete a key
+     *
+     * @param message
+     */
+    case class DEL(message: Array[Byte]) {
+
+      def this(message: String) = this(toBytes(message))
+    }
+
+    /**
+     * Set a key's time to live in seconds
+     *
+     * @param key
+     */
+    case class EXPIRE(key: Array[Byte], seconds: Int) {
+      def this(key: String, seconds: Int) = this(toBytes(key), seconds)
+    }
+
+    /**
+     * Set the expiration for a key as a UNIX timestamp
+     *
+     * @param key
+     * @param timestamp
+     */
+    case class EXPIREAT(key: Array[Byte], timestamp: Long) {
+      def this(key: String, timestamp: Long) = this(toBytes(key), timestamp)
+    }
+
+    /**
+     * Atomically transfer a key from a Redis instance to another one.
+     *
+     * @param host
+     * @param port
+     * @param key
+     * @param database
+     * @param timeout
+     */
+    case class MIGRATE(host: Array[Byte], port: Int, key: Array[Byte], database: Int, timeout: Int) {
+      def this(host: String, port: Int, key: String, database: Int, timeout: Int) =
+        this(toBytes(host), port, toBytes(key), database, timeout)
+    }
+
+    /**
+     * Move a key to another database
+     *
+     * @param key
+     * @param db
+     */
+    case class MOVE(key: Array[Byte], db: Int) {
+      def this(key: String, db: Int) = this(toBytes(key), db)
+    }
+
+    /**
+     * Remove the expiration from a key
+     *
+     * @param key
+     */
+    case class PERSIST(key: Array[Byte]) {
+      def this(key: String) = this(toBytes(key))
+    }
+
+    /**
+     * Set a key's time to live in milliseconds
+     *
+     * @param key
+     * @param milliseconds
+     */
+    case class PEXPIRE(key: Array[Byte], milliseconds: Long) {
+      def this(key: String, milliseconds: Long) = this(toBytes(key), milliseconds)
+    }
+
+    /**
+     * Set the expiration for a key as a UNIX timestamp specified in milliseconds
+     *
+     * @param key
+     * @param timestamp
+     */
+    case class PEXPIREAT(key: Array[Byte], timestamp: Long) {
+      def this(key: String, milliseconds: Long) = this(toBytes(key), milliseconds)
+    }
+
+    /**
+     * Rename a key
+     *
+     * @param key
+     * @param newKey
+     */
+    case class RENAME(key: Array[Byte], newKey: Array[Byte]) {
+      def this(key: String, newKey: String) = this(toBytes(key), toBytes(newKey))
+    }
+
+    /**
+     * Rename a key, only if the new key does not exist
+     *
+     * @param key
+     * @param newKey
+     */
+    case class RENAMENX(key: Array[Byte], newKey: Array[Byte]) {
+      def this(key: String, newKey: String) = this(toBytes(key), toBytes(newKey))
+    }
+
+  }
+
+
+  object Sets {
+
+    /**
+     * Add one or more members to a set
+     *
+     * @param key
+     * @param members
+     */
+    case class SADD(key: Array[Byte], members: Array[Byte]) {
+
+      def this(key: String, members: String) = this(key, toBytes(members))
+    }
+
+
+    /**
+     * Move a member from one set to another
+     *
+     * @param source
+     * @param destination
+     * @param member
+     */
+    case class SMOVE(source: Array[Byte], destination: Array[Byte], member: Array[Byte]) {
+      def this(source: String, destination: String, member: String) =
+        this(toBytes(source), toBytes(destination), toBytes(member))
+    }
+
+
+    /**
+     * Remove one or more members from a set
+     *
+     * @param key
+     * @param member
+     */
+    case class SREM(key: Array[Byte], member: Array[Byte]) {
+
+      def this(key: String, member: String) = this(key, toBytes(member))
+    }
+
+  }
+
+  object String {
+
+    /**
+     * Append a value to a key
+     *
+     * @param key
+     * @param value
+     */
+    case class APPEND(key: Array[Byte], value: Array[Byte]) {
+      def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+    }
+
+    /**
+     * Decrement the integer value of a key by one
+     *
+     * @param key
+     */
+    case class DECR(key: Array[Byte]) {
+      def this(key: String) = this(toBytes(key))
+    }
+
+    /**
+     * Decrement the integer value of a key by the given number
+     *
+     * @param key
+     * @param decrement
+     */
+    case class DECRBY(key: Array[Byte], decrement: Int) {
+      def this(key: String, decrement: Int) = this(toBytes(key), decrement)
+    }
+
+    /**
+     * Increment the integer value of a key by one
+     *
+     * @param key
+     */
+    case class INCR(key: Array[Byte]) {
+      def this(key: String) = this(toBytes(key))
+    }
+
+    /**
+     * Increment the integer value of a key by the given amount
+     *
+     * @param key
+     * @param increment
+     */
+    case class INCRBY(key: Array[Byte], increment: Int) {
+      def this(key: String, increment: Int) = this(toBytes(key), increment)
+    }
+
+    /**
+     * Increment the float value of a key by the given amount
+     *
+     * @param key
+     * @param increment
+     */
+    case class INCRBYFLOAT(key: Array[Byte], increment: Double) {
+      def this(key: String, increment: Number) = this(toBytes(key), increment)
+    }
+
+
+    /**
+     * Set the string value of a key
+     *
+     * @param key
+     * @param value
+     */
+    case class SET(key: Array[Byte], value: Array[Byte]) {
+      def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+    }
+
+    /**
+     * Sets or clears the bit at offset in the string value stored at key
+     *
+     * @param key
+     * @param offset
+     * @param value
+     */
+    case class SETBIT(key: Array[Byte], offset: Long, value: Array[Byte]) {
+      def this(key: String, offset: Long, value: String) = this(toBytes(key), offset, toBytes(value))
+    }
+
+    /**
+     * Set the value and expiration of a key
+     *
+     * @param key
+     * @param seconds
+     * @param value
+     */
+    case class SETEX(key: Array[Byte], seconds: Int, value: Array[Byte]) {
+      def this(key: String, seconds: Int, value: String) = this(toBytes(key), seconds, toBytes(value))
+    }
+
+    /**
+     * Set the value of a key, only if the key does not exist
+     *
+     * @param key
+     * @param value
+     */
+    case class SETNX(key: Array[Byte], value: Array[Byte]) {
+      def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+    }
+
+    /**
+     * Overwrite part of a string at key starting at the specified offset
+     *
+     * @param key
+     * @param offset
+     * @param value
+     */
+    case class SETRANGE(key: Array[Byte], offset: Int, value: Array[Byte]) {
+      def this(key: String, offset: Int, value: String) = this(toBytes(key), offset, toBytes(value))
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala
----------------------------------------------------------------------
diff --git a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala
new file mode 100644
index 0000000..3f75949
--- /dev/null
+++ b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.redis
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.redis.RedisMessage.Geo.GEOADD
+import org.apache.gearpump.redis.RedisMessage.Hashes._
+import org.apache.gearpump.redis.RedisMessage.HyperLogLog._
+import org.apache.gearpump.redis.RedisMessage.Keys._
+import org.apache.gearpump.redis.RedisMessage.Lists._
+import org.apache.gearpump.redis.RedisMessage.Sets._
+import org.apache.gearpump.redis.RedisMessage.String._
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.util.LogUtil
+import redis.clients.jedis.Jedis
+import redis.clients.jedis.Protocol.{DEFAULT_DATABASE, DEFAULT_HOST, DEFAULT_PORT, DEFAULT_TIMEOUT}
+
+/**
+  * Save message in Redis Instance
+  *
+  * @param host
+  * @param port
+  * @param timeout
+  * @param database
+  * @param password
+  */
+class RedisSink(
+                    host: String = DEFAULT_HOST,
+                    port: Int = DEFAULT_PORT,
+                    timeout: Int = DEFAULT_TIMEOUT,
+                    database: Int = DEFAULT_DATABASE,
+                    password: String = "") extends DataSink {
+
+  private val LOG = LogUtil.getLogger(getClass)
+  @transient private lazy val client = new Jedis(host, port, timeout)
+
+  override def open(context: TaskContext): Unit = {
+    client.select(database)
+
+    if (password != null && password.length != 0) {
+      client.auth(password)
+    }
+  }
+
+  override def write(message: Message): Unit = {
+
+    message.msg match {
+      // GEO
+      case msg: GEOADD => client.geoadd(msg.key, msg.longitude, msg.latitude, msg.member)
+
+      // Hashes
+      case msg: HDEL => client.hdel(msg.key, msg.field)
+      case msg: HINCRBY => client.hincrBy(msg.key, msg.field, msg.increment)
+      case msg: HINCRBYFLOAT => client.hincrByFloat(msg.key, msg.field, msg.increment)
+      case msg: HSET => client.hset(msg.key, msg.field, msg.value)
+      case msg: HSETNX => client.hsetnx(msg.key, msg.field, msg.value)
+
+      // HyperLogLog
+      case msg: PFADD => client.pfadd(msg.key, msg.element)
+
+      // Lists
+      case msg: LPUSH => client.lpush(msg.key, msg.value)
+      case msg: LPUSHX => client.lpushx(msg.key, msg.value)
+      case msg: LSET => client.lset(msg.key, msg.index, msg.value)
+      case msg: RPUSH => client.rpush(msg.key, msg.value)
+      case msg: RPUSHX => client.rpushx(msg.key, msg.value)
+
+      // Keys
+      case msg: DEL => client.del(msg.message)
+      case msg: EXPIRE => client.expire(msg.key, msg.seconds)
+      case msg: EXPIREAT => client.expireAt(msg.key, msg.timestamp)
+      case msg: MIGRATE => client.migrate(msg.host, msg.port, msg.key, msg.database, msg.timeout)
+      case msg: MOVE => client.move(msg.key, msg.db)
+      case msg: PERSIST => client.persist(msg.key)
+      case msg: PEXPIRE => client.pexpire(msg.key, msg.milliseconds)
+      case msg: PEXPIREAT => client.pexpireAt(msg.key, msg.timestamp)
+      case msg: RENAME => client.rename(msg.key, msg.newKey)
+      case msg: RENAMENX => client.renamenx(msg.key, msg.newKey)
+
+      // Sets
+      case msg: SADD => client.sadd(msg.key, msg.members)
+      case msg: SMOVE => client.smove(msg.source, msg.destination, msg.member)
+      case msg: SREM => client.srem(msg.key, msg.member)
+
+      // String
+      case msg: APPEND => client.append(msg.key, msg.value)
+      case msg: DECR => client.decr(msg.key)
+      case msg: DECRBY => client.decrBy(msg.key, msg.decrement)
+      case msg: INCR => client.incr(msg.key)
+      case msg: INCRBY => client.incrBy(msg.key, msg.increment)
+      case msg: INCRBYFLOAT => client.incrByFloat(msg.key, msg.increment)
+      case msg: SET => client.set(msg.key, msg.value)
+      case msg: SETBIT => client.setbit(msg.key, msg.offset, msg.value)
+      case msg: SETEX => client.setex(msg.key, msg.seconds, msg.value)
+      case msg: SETNX => client.setnx(msg.key, msg.value)
+      case msg: SETRANGE => client.setrange(msg.key, msg.offset, msg.value)
+    }
+  }
+
+  override def close(): Unit = {
+    client.close()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/project/Build.scala
----------------------------------------------------------------------
diff --git a/project/Build.scala b/project/Build.scala
index 0b1628e..f1e0443 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -154,12 +154,6 @@ object Build extends sbt.Build {
     dependencyOverrides += "com.typesafe.akka" %% "akka-stream" % akkaStreamVersion
   )
 
-  val streamingDependencies = Seq(
-    unmanagedJars in Compile ++= Seq(
-      getShadedJarFile("gs-collections", version.value)
-    )
-  )
-
   val coreDependencies = Seq(
     libraryDependencies ++= Seq(
       "org.slf4j" % "slf4j-api" % slf4jVersion,
@@ -199,9 +193,9 @@ object Build extends sbt.Build {
     ),
 
     unmanagedJars in Compile ++= Seq(
-      getShadedJarFile("metrics-graphite", version.value),
-      getShadedJarFile("guava", version.value),
-      getShadedJarFile("akka-kryo", version.value)
+      getShadedJarFile(shaded_metrics_graphite.id, version.value),
+      getShadedJarFile(shaded_guava.id, version.value),
+      getShadedJarFile(shaded_akka_kryo.id, version.value)
     )
   )
 
@@ -250,6 +244,20 @@ object Build extends sbt.Build {
       .map(_.filterNot(_.getCanonicalPath.contains("akka")))
   }
 
+  private def addShadedDeps(deps: Seq[xml.Node], node: xml.Node): xml.Node = {
+    node match {
+      case elem: xml.Elem =>
+        val child = if (elem.label == "dependencies") {
+          elem.child ++ deps
+        } else {
+          elem.child.map(addShadedDeps(deps, _))
+        }
+        xml.Elem(elem.prefix, elem.label, elem.attributes, elem.scope, false, child: _*)
+      case _ =>
+        node
+    }
+  }
+
   lazy val root = Project(
     id = "gearpump",
     base = file("."),
@@ -262,7 +270,14 @@ object Build extends sbt.Build {
   lazy val core = Project(
     id = "gearpump-core",
     base = file("core"),
-    settings = commonSettings ++ javadocSettings ++ coreDependencies)
+    settings = commonSettings ++ javadocSettings ++ coreDependencies ++ Seq(
+      pomPostProcess := {
+        (node: xml.Node) => addShadedDeps(List(
+          getShadedDepXML(organization.value, shaded_akka_kryo.id, version.value),
+          getShadedDepXML(organization.value, shaded_guava.id, version.value),
+          getShadedDepXML(organization.value, shaded_metrics_graphite.id, version.value)), node)
+      }
+    ))
       .disablePlugins(sbtassembly.AssemblyPlugin)
 
   lazy val daemon = Project(
@@ -282,9 +297,18 @@ object Build extends sbt.Build {
   lazy val streaming = Project(
     id = "gearpump-streaming",
     base = file("streaming"),
-    settings = commonSettings ++ javadocSettings ++ streamingDependencies)
-      .dependsOn(core % "test->test; compile->compile", daemon % "test->test")
-      .disablePlugins(sbtassembly.AssemblyPlugin)
+    settings = commonSettings ++ javadocSettings ++ Seq(
+      unmanagedJars in Compile ++= Seq(
+        getShadedJarFile(shaded_gs_collections.id, version.value)
+      ),
+
+      pomPostProcess := {
+        (node: xml.Node) => addShadedDeps(List(
+          getShadedDepXML(organization.value, shaded_gs_collections.id, version.value)), node)
+      }
+    ))
+    .dependsOn(core % "test->test; compile->compile", shaded_gs_collections, daemon % "test->test")
+    .disablePlugins(sbtassembly.AssemblyPlugin)
 
   lazy val external_kafka = Project(
     id = "gearpump-external-kafka",
@@ -402,6 +426,18 @@ object Build extends sbt.Build {
       .dependsOn (services % "test->test; compile->compile", daemon % "test->test; compile->compile")
       .disablePlugins(sbtassembly.AssemblyPlugin)
 
+  lazy val redis = Project(
+    id = "gearpump-experiments-redis",
+    base = file("experiments/redis"),
+    settings = commonSettings ++ noPublish ++ myAssemblySettings ++
+      Seq(
+        libraryDependencies ++= Seq(
+          "redis.clients" % "jedis" % "2.9.0"
+        ),
+        mainClass in(Compile, packageBin) := Some("org.apache.gearpump.example.Test")
+      ))
+    .dependsOn(streaming % "test->test; provided", daemon % "test->test; provided")
+
   lazy val storm = Project(
     id = "gearpump-experiments-storm",
     base = file("experiments/storm"),

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/project/BuildShaded.scala
----------------------------------------------------------------------
diff --git a/project/BuildShaded.scala b/project/BuildShaded.scala
index 1f59bfd..a43587c 100644
--- a/project/BuildShaded.scala
+++ b/project/BuildShaded.scala
@@ -35,7 +35,7 @@ object BuildShaded extends sbt.Build {
       _.copy(includeScala = false)
     },
     assemblyJarName in assembly := {
-      s"${name.value}-$scalaVersionMajor-${version.value}-assembly.jar"
+      s"${name.value}_$scalaVersionMajor-${version.value}.jar"
     },
     target in assembly := baseDirectory.value.getParentFile / "target" / scalaVersionMajor
   )
@@ -44,92 +44,99 @@ object BuildShaded extends sbt.Build {
     id = "gearpump-shaded",
     base = file("shaded")
   ).aggregate(shaded_akka_kryo, shaded_gs_collections, shaded_guava, shaded_metrics_graphite)
-      .disablePlugins(sbtassembly.AssemblyPlugin)
-
+    .disablePlugins(sbtassembly.AssemblyPlugin)
 
   lazy val shaded_akka_kryo = Project(
     id = "gearpump-shaded-akka-kryo",
     base = file("shaded/akka-kryo"),
-    settings = shadeAssemblySettings ++ addArtifact(Artifact("gearpump-shaded-akka-kryo",
-      "assembly"), sbtassembly.AssemblyKeys.assembly) ++
-        Seq(
-          assemblyShadeRules in assembly := Seq(
-            ShadeRule.zap("com.google.protobuf.**").inAll,
-            ShadeRule.zap("com.typesafe.config.**").inAll,
-            ShadeRule.zap("akka.**").inAll,
-            ShadeRule.zap("org.jboss.netty.**").inAll,
-            ShadeRule.zap("net.jpountz.lz4.**").inAll,
-            ShadeRule.zap("org.uncommons.maths.**").inAll,
-            ShadeRule.rename("com.romix.**" -> "org.apache.gearpump.romix.@1").inAll,
-            ShadeRule.rename("com.esotericsoftware.**" ->
-                "org.apache.gearpump.esotericsoftware.@1").inAll,
-            ShadeRule.rename("org.objenesis.**" -> "org.apache.gearpump.objenesis.@1").inAll
-          )
-        ) ++
-        Seq(
-          libraryDependencies ++= Seq(
-            "com.github.romix.akka" %% "akka-kryo-serialization" % kryoVersion
-          )
+    settings = shadeAssemblySettings ++ addArtifact(Artifact("gearpump-shaded-akka-kryo"),
+      sbtassembly.AssemblyKeys.assembly) ++
+      Seq(
+        assemblyShadeRules in assembly := Seq(
+          ShadeRule.zap("com.google.protobuf.**").inAll,
+          ShadeRule.zap("com.typesafe.config.**").inAll,
+          ShadeRule.zap("akka.**").inAll,
+          ShadeRule.zap("org.jboss.netty.**").inAll,
+          ShadeRule.zap("net.jpountz.lz4.**").inAll,
+          ShadeRule.zap("org.uncommons.maths.**").inAll,
+          ShadeRule.rename("com.romix.**" -> "org.apache.gearpump.romix.@1").inAll,
+          ShadeRule.rename("com.esotericsoftware.**" ->
+            "org.apache.gearpump.esotericsoftware.@1").inAll,
+          ShadeRule.rename("org.objenesis.**" -> "org.apache.gearpump.objenesis.@1").inAll
+        )
+      ) ++
+      Seq(
+        libraryDependencies ++= Seq(
+          "com.github.romix.akka" %% "akka-kryo-serialization" % kryoVersion
         )
+      )
   )
 
   lazy val shaded_gs_collections = Project(
     id = "gearpump-shaded-gs-collections",
     base = file("shaded/gs-collections"),
-    settings = shadeAssemblySettings ++ addArtifact(Artifact("gearpump-shaded-gs-collections",
-      "assembly"), sbtassembly.AssemblyKeys.assembly) ++
-        Seq(
-          assemblyShadeRules in assembly := Seq(
-            ShadeRule.rename("com.gs.collections.**" ->
-                "org.apache.gearpump.gs.collections.@1").inAll
-          )
-        ) ++
-        Seq(
-          libraryDependencies ++= Seq(
-            "com.goldmansachs" % "gs-collections" % gsCollectionsVersion
-          )
+    settings = shadeAssemblySettings ++ addArtifact(Artifact("gearpump-shaded-gs-collections"),
+      sbtassembly.AssemblyKeys.assembly) ++
+      Seq(
+        assemblyShadeRules in assembly := Seq(
+          ShadeRule.rename("com.gs.collections.**" ->
+            "org.apache.gearpump.gs.collections.@1").inAll
         )
+      ) ++
+      Seq(
+        libraryDependencies ++= Seq(
+          "com.goldmansachs" % "gs-collections" % gsCollectionsVersion
+        )
+      )
   )
 
   lazy val shaded_guava = Project(
     id = "gearpump-shaded-guava",
     base = file("shaded/guava"),
-    settings = shadeAssemblySettings ++ addArtifact(Artifact("gearpump-shaded-guava",
-      "assembly"), sbtassembly.AssemblyKeys.assembly) ++
-        Seq(
-          assemblyShadeRules in assembly := Seq(
-            ShadeRule.rename("com.google.**" -> "org.apache.gearpump.google.@1").inAll
-          )
-        ) ++
-        Seq(
-          libraryDependencies ++= Seq(
-            "com.google.guava" % "guava" % guavaVersion
-          )
+    settings = shadeAssemblySettings ++ addArtifact(Artifact("gearpump-shaded-guava"),
+      sbtassembly.AssemblyKeys.assembly) ++
+      Seq(
+        assemblyShadeRules in assembly := Seq(
+          ShadeRule.rename("com.google.**" -> "org.apache.gearpump.google.@1").inAll
+        )
+      ) ++
+      Seq(
+        libraryDependencies ++= Seq(
+          "com.google.guava" % "guava" % guavaVersion
         )
+      )
   )
 
   lazy val shaded_metrics_graphite = Project(
     id = "gearpump-shaded-metrics-graphite",
     base = file("shaded/metrics-graphite"),
-    settings = shadeAssemblySettings ++ addArtifact(Artifact("gearpump-shaded-metrics-graphite",
-      "assembly"), sbtassembly.AssemblyKeys.assembly) ++
-        Seq(
-          assemblyShadeRules in assembly := Seq(
-            ShadeRule.rename("com.codahale.metrics.**" ->
-                "org.apache.gearpump.codahale.metrics.@1").inAll
-          )
-        ) ++
-        Seq(
-          libraryDependencies ++= Seq(
-            "com.codahale.metrics" % "metrics-graphite" % codahaleVersion,
-            "com.codahale.metrics" % "metrics-jvm" % codahaleVersion
-          )
+    settings = shadeAssemblySettings ++ addArtifact(Artifact("gearpump-shaded-metrics-graphite"),
+      sbtassembly.AssemblyKeys.assembly) ++
+      Seq(
+        assemblyShadeRules in assembly := Seq(
+          ShadeRule.rename("com.codahale.metrics.**" ->
+            "org.apache.gearpump.codahale.metrics.@1").inAll
         )
+      ) ++
+      Seq(
+        libraryDependencies ++= Seq(
+          "com.codahale.metrics" % "metrics-graphite" % codahaleVersion,
+          "com.codahale.metrics" % "metrics-jvm" % codahaleVersion
+        )
+      )
   )
 
   def getShadedJarFile(name: String, gearpumpVersion: String): File = {
     shaded.base / "target" / scalaVersionMajor /
-      s"gearpump-shaded-$name-$scalaVersionMajor-$gearpumpVersion-assembly.jar"
+      s"${name}_$scalaVersionMajor-$gearpumpVersion.jar"
+  }
+
+  def getShadedDepXML(groupId: String, artifactId: String, version: String): scala.xml.Node = {
+    <dependency>
+      <groupId>{groupId}</groupId>
+      <artifactId>{artifactId}</artifactId>
+      <version>{version}</version>
+    </dependency>
   }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
index cd33b50..f99a436 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
@@ -22,6 +22,7 @@ object Constants {
   val GEARPUMP_STREAMING_OPERATOR = "gearpump.streaming.dsl.operator"
   val GEARPUMP_STREAMING_SOURCE = "gearpump.streaming.source"
   val GEARPUMP_STREAMING_GROUPBY_FUNCTION = "gearpump.streaming.dsl.groupby-function"
+  val GEARPUMP_STREAMING_WINDOW_FUNCTION = "gearpump.streaming.dsl.window-function"
 
   val GEARPUMP_STREAMING_LOCALITIES = "gearpump.streaming.localities"
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
index 66ec873..a6588a1 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
@@ -123,7 +123,7 @@ object LifeTime {
  */
 class StreamApplication(
     override val name: String, val inputUserConfig: UserConfig,
-    val dag: Graph[ProcessorDescription, PartitionerDescription])
+    dag: Graph[ProcessorDescription, PartitionerDescription])
   extends Application {
 
   require(!dag.hasDuplicatedEdge(), "Graph should not have duplicated edges")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala
index 786d496..440a45e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala
@@ -20,7 +20,10 @@ package org.apache.gearpump.streaming.dsl
 
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.dsl.op._
+import org.apache.gearpump.streaming.dsl.plan._
+import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapFunction, ReduceFunction}
+import org.apache.gearpump.streaming.dsl.window.api._
+import org.apache.gearpump.streaming.dsl.window.impl._
 import org.apache.gearpump.streaming.sink.DataSink
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
 import org.apache.gearpump.util.Graph
@@ -35,12 +38,12 @@ class Stream[T](
   /**
    * converts a value[T] to a list of value[R]
    *
-   * @param fun FlatMap function
+   * @param fn FlatMap function
    * @param description The description message for this operation
    * @return A new stream with type [R]
    */
-  def flatMap[R](fun: T => TraversableOnce[R], description: String = null): Stream[R] = {
-    val flatMapOp = FlatMapOp(fun, Option(description).getOrElse("flatmap"))
+  def flatMap[R](fn: T => TraversableOnce[R], description: String = "flatMap"): Stream[R] = {
+    val flatMapOp = ChainableOp(new FlatMapFunction[T, R](fn, description))
     graph.addVertex(flatMapOp)
     graph.addEdge(thisNode, edge.getOrElse(Direct), flatMapOp)
     new Stream[R](graph, flatMapOp)
@@ -49,36 +52,36 @@ class Stream[T](
   /**
    * Maps message of type T message of type R
    *
-   * @param fun Function
+   * @param fn Function
    * @return A new stream with type [R]
    */
-  def map[R](fun: T => R, description: String = null): Stream[R] = {
+  def map[R](fn: T => R, description: String = "map"): Stream[R] = {
     this.flatMap({ data =>
-      Option(fun(data))
-    }, Option(description).getOrElse("map"))
+      Option(fn(data))
+    }, description)
   }
 
   /**
    * Keeps records when fun(T) == true
    *
-   * @param fun  the filter
+   * @param fn  the filter
    * @return  a new stream after filter
    */
-  def filter(fun: T => Boolean, description: String = null): Stream[T] = {
+  def filter(fn: T => Boolean, description: String = "filter"): Stream[T] = {
     this.flatMap({ data =>
-      if (fun(data)) Option(data) else None
-    }, Option(description).getOrElse("filter"))
+      if (fn(data)) Option(data) else None
+    }, description)
   }
 
   /**
    * Reduces operations.
    *
-   * @param fun  reduction function
+   * @param fn  reduction function
    * @param description description message for this operator
    * @return a new stream after reduction
    */
-  def reduce(fun: (T, T) => T, description: String = null): Stream[T] = {
-    val reduceOp = ReduceOp(fun, Option(description).getOrElse("reduce"))
+  def reduce(fn: (T, T) => T, description: String = "reduce"): Stream[T] = {
+    val reduceOp = ChainableOp(new ReduceFunction(fn, description))
     graph.addVertex(reduceOp)
     graph.addEdge(thisNode, edge.getOrElse(Direct), reduceOp)
     new Stream(graph, reduceOp)
@@ -88,7 +91,10 @@ class Stream[T](
    * Log to task log file
    */
   def log(): Unit = {
-    this.map(msg => LoggerFactory.getLogger("dsl").info(msg.toString), "log")
+    this.map(msg => {
+      LoggerFactory.getLogger("dsl").info(msg.toString)
+      msg
+    }, "log")
   }
 
   /**
@@ -97,8 +103,8 @@ class Stream[T](
    * @param other the other stream
    * @return  the merged stream
    */
-  def merge(other: Stream[T], description: String = null): Stream[T] = {
-    val mergeOp = MergeOp(Option(description).getOrElse("merge"))
+  def merge(other: Stream[T], description: String = "merge"): Stream[T] = {
+    val mergeOp = MergeOp(description, UserConfig.empty)
     graph.addVertex(mergeOp)
     graph.addEdge(thisNode, edge.getOrElse(Direct), mergeOp)
     graph.addEdge(other.thisNode, other.edge.getOrElse(Shuffle), mergeOp)
@@ -115,20 +121,29 @@ class Stream[T](
    *
    * For example,
    * {{{
-   * Stream[People].groupBy(_.gender).flatmap(..).filter.(..).reduce(..)
+   * Stream[People].groupBy(_.gender).flatMap(..).filter(..).reduce(..)
    * }}}
    *
-   * @param fun  Group by function
+   * @param fn  Group by function
    * @param parallelism  Parallelism level
    * @param description  The description
    * @return  the grouped stream
    */
-  def groupBy[Group](fun: T => Group, parallelism: Int = 1, description: String = null)
-    : Stream[T] = {
-    val groupOp = GroupByOp(fun, parallelism, Option(description).getOrElse("groupBy"))
-    graph.addVertex(groupOp)
-    graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp)
-    new Stream[T](graph, groupOp)
+  def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1,
+      description: String = "groupBy"): Stream[T] = {
+    window(CountWindow.apply(1).accumulating)
+      .groupBy[GROUP](fn, parallelism, description)
+  }
+
+  /**
+   * Window function
+   *
+   * @param win window definition
+   * @param description window description
+   * @return [[WindowStream]] where groupBy could be applied
+   */
+  def window(win: Window, description: String = "window"): WindowStream[T] = {
+    new WindowStream[T](graph, edge, thisNode, win, description)
   }
 
   /**
@@ -140,15 +155,28 @@ class Stream[T](
    */
   def process[R](
       processor: Class[_ <: Task], parallelism: Int, conf: UserConfig = UserConfig.empty,
-      description: String = null): Stream[R] = {
-    val processorOp = ProcessorOp(processor, parallelism, conf,
-      Option(description).getOrElse("process"))
+      description: String = "process"): Stream[R] = {
+    val processorOp = ProcessorOp(processor, parallelism, conf, description)
     graph.addVertex(processorOp)
     graph.addEdge(thisNode, edge.getOrElse(Shuffle), processorOp)
     new Stream[R](graph, processorOp, Some(Shuffle))
   }
 }
 
+class WindowStream[T](graph: Graph[Op, OpEdge], edge: Option[OpEdge], thisNode: Op,
+    window: Window, winDesc: String) {
+
+  def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1,
+      description: String = "groupBy"): Stream[T] = {
+    val groupBy: GroupByFn[T, (GROUP, List[Bucket])] = GroupAlsoByWindow(fn, window)
+    val groupOp = GroupByOp[T, (GROUP, List[Bucket])](groupBy, parallelism,
+      s"$winDesc.$description")
+    graph.addVertex(groupOp)
+    graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp)
+    new Stream[T](graph, groupOp)
+  }
+}
+
 class KVStream[K, V](stream: Stream[Tuple2[K, V]]) {
   /**
    * GroupBy key
@@ -192,30 +220,18 @@ object Stream {
   }
 
   implicit class Sink[T](stream: Stream[T]) extends java.io.Serializable {
-    def sink[T](dataSink: DataSink, parallism: Int, conf: UserConfig, description: String)
-      : Stream[T] = {
-      implicit val sink = DataSinkOp[T](dataSink, parallism, conf,
-        Some(description).getOrElse("traversable"))
+    def sink(dataSink: DataSink, parallelism: Int = 1,
+        conf: UserConfig = UserConfig.empty, description: String = "sink"): Stream[T] = {
+      implicit val sink = DataSinkOp(dataSink, parallelism, conf, description)
       stream.graph.addVertex(sink)
       stream.graph.addEdge(stream.thisNode, Shuffle, sink)
       new Stream[T](stream.graph, sink)
     }
-
-    def sink[T](
-        sink: Class[_ <: Task], parallism: Int, conf: UserConfig = UserConfig.empty,
-        description: String = null): Stream[T] = {
-      val sinkOp = ProcessorOp(sink, parallism, conf, Option(description).getOrElse("source"))
-      stream.graph.addVertex(sinkOp)
-      stream.graph.addEdge(stream.thisNode, Shuffle, sinkOp)
-      new Stream[T](stream.graph, sinkOp)
-    }
   }
 }
 
 class LoggerSink[T] extends DataSink {
-  var logger: Logger = null
-
-  private var context: TaskContext = null
+  var logger: Logger = _
 
   override def open(context: TaskContext): Unit = {
     this.logger = context.logger

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
index d45737b..8116146 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
@@ -24,10 +24,9 @@ import akka.actor.ActorSystem
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.streaming.StreamApplication
-import org.apache.gearpump.streaming.dsl.op.{DataSourceOp, Op, OpEdge, ProcessorOp}
-import org.apache.gearpump.streaming.dsl.plan.Planner
+import org.apache.gearpump.streaming.dsl.plan._
 import org.apache.gearpump.streaming.source.DataSource
-import org.apache.gearpump.streaming.task.{Task, TaskContext}
+import org.apache.gearpump.streaming.task.TaskContext
 import org.apache.gearpump.util.Graph
 import org.apache.gearpump.Message
 
@@ -50,7 +49,8 @@ import scala.language.implicitConversions
  * @param name name of app
  */
 class StreamApp(
-    val name: String, system: ActorSystem, userConfig: UserConfig, val graph: Graph[Op, OpEdge]) {
+    name: String, system: ActorSystem, userConfig: UserConfig,
+    private val graph: Graph[Op, OpEdge]) {
 
   def this(name: String, system: ActorSystem, userConfig: UserConfig) = {
     this(name, system, userConfig, Graph.empty[Op, OpEdge])
@@ -76,34 +76,16 @@ object StreamApp {
 
   implicit class Source(app: StreamApp) extends java.io.Serializable {
 
-    def source[T](dataSource: DataSource, parallelism: Int): Stream[T] = {
-      source(dataSource, parallelism, UserConfig.empty)
-    }
-
-    def source[T](dataSource: DataSource, parallelism: Int, description: String): Stream[T] = {
-      source(dataSource, parallelism, UserConfig.empty, description)
-    }
-
-    def source[T](dataSource: DataSource, parallelism: Int, conf: UserConfig): Stream[T] = {
-      source(dataSource, parallelism, conf, description = null)
-    }
-
-    def source[T](dataSource: DataSource, parallelism: Int, conf: UserConfig, description: String)
-      : Stream[T] = {
+    def source[T](dataSource: DataSource, parallelism: Int = 1,
+        conf: UserConfig = UserConfig.empty, description: String = "source"): Stream[T] = {
       implicit val sourceOp = DataSourceOp(dataSource, parallelism, conf, description)
       app.graph.addVertex(sourceOp)
       new Stream[T](app.graph, sourceOp)
     }
+
     def source[T](seq: Seq[T], parallelism: Int, description: String): Stream[T] = {
       this.source(new CollectionDataSource[T](seq), parallelism, UserConfig.empty, description)
     }
-
-    def source[T](source: Class[_ <: Task], parallelism: Int, conf: UserConfig, description: String)
-      : Stream[T] = {
-      val sourceOp = ProcessorOp(source, parallelism, conf, Option(description).getOrElse("source"))
-      app.graph.addVertex(sourceOp)
-      new Stream[T](app.graph, sourceOp)
-    }
   }
 }
 
@@ -115,7 +97,7 @@ class CollectionDataSource[T](seq: Seq[T]) extends DataSource {
 
   override def read(): Message = {
     if (iterator.hasNext) {
-      Message(iterator.next())
+      Message(iterator.next(), Instant.now().toEpochMilli)
     } else {
       null
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
index 6eff20c..3003b98 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
@@ -19,9 +19,9 @@
 package org.apache.gearpump.streaming.dsl.javaapi
 
 import scala.collection.JavaConverters._
-
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.dsl.Stream
+import org.apache.gearpump.streaming.dsl.window.api.Window
+import org.apache.gearpump.streaming.dsl.{Stream, WindowStream}
 import org.apache.gearpump.streaming.javaapi.dsl.functions._
 import org.apache.gearpump.streaming.task.Task
 
@@ -63,9 +63,13 @@ class JavaStream[T](val stream: Stream[T]) {
    * Group by a stream and turns it to a list of sub-streams. Operations chained after
    * groupBy applies to sub-streams.
    */
-  def groupBy[Group](fn: GroupByFunction[T, Group], parallelism: Int, description: String)
-    : JavaStream[T] = {
-    new JavaStream[T](stream.groupBy({t: T => fn(t)}, parallelism, description))
+  def groupBy[GROUP](fn: GroupByFunction[T, GROUP],
+      parallelism: Int, description: String): JavaStream[T] = {
+    new JavaStream[T](stream.groupBy((t: T) => fn, parallelism, description))
+  }
+
+  def window(win: Window, description: String): JavaWindowStream[T] = {
+    new JavaWindowStream[T](stream.window(win, description))
   }
 
   /** Add a low level Processor to process messages */
@@ -75,3 +79,11 @@ class JavaStream[T](val stream: Stream[T]) {
     new JavaStream[R](stream.process(processor, parallelism, conf, description))
   }
 }
+
+class JavaWindowStream[T](stream: WindowStream[T]) {
+
+  def groupBy[GROUP](fn: GroupByFunction[T, GROUP], parallelism: Int,
+      description: String): JavaStream[T] = {
+    new JavaStream[T](stream.groupBy((t: T) => fn, parallelism, description))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala
deleted file mode 100644
index 49d9dec..0000000
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.dsl.op
-
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.sink.DataSink
-import org.apache.gearpump.streaming.source.DataSource
-import org.apache.gearpump.streaming.task.Task
-
-/**
- * Operators for the DSL
- */
-sealed trait Op {
-  def description: String
-  def conf: UserConfig
-}
-
-/**
- * When translated to running DAG, SlaveOP can be attach to MasterOP or other SlaveOP
- * "Attach" means running in same Actor.
- */
-trait SlaveOp[T] extends Op
-
-case class FlatMapOp[T, R](
-    fun: (T) => TraversableOnce[R], description: String, conf: UserConfig = UserConfig.empty)
-  extends SlaveOp[T]
-
-case class ReduceOp[T](fun: (T, T) => T, description: String, conf: UserConfig = UserConfig.empty)
-  extends SlaveOp[T]
-
-trait MasterOp extends Op
-
-trait ParameterizedOp[T] extends MasterOp
-
-case class MergeOp(description: String, override val conf: UserConfig = UserConfig.empty)
-  extends MasterOp
-
-case class GroupByOp[T, R](
-    fun: T => R, parallelism: Int, description: String,
-    override val conf: UserConfig = UserConfig.empty)
-  extends ParameterizedOp[T]
-
-case class ProcessorOp[T <: Task](
-    processor: Class[T], parallelism: Int, conf: UserConfig, description: String)
-  extends ParameterizedOp[T]
-
-case class DataSourceOp[T](
-    dataSource: DataSource, parallelism: Int, conf: UserConfig, description: String)
-  extends ParameterizedOp[T]
-
-case class DataSinkOp[T](
-    dataSink: DataSink, parallelism: Int, conf: UserConfig, description: String)
-  extends ParameterizedOp[T]
-
-/**
- * Contains operators which can be chained to single one.
- *
- * For example, flatmap().map().reduce() can be chained to single operator as
- * no data shuffling is required.
- * @param ops list of operations
- */
-case class OpChain(ops: List[Op]) extends Op {
-  def head: Op = ops.head
-  def last: Op = ops.last
-
-  def description: String = null
-
-  override def conf: UserConfig = {
-    // The head's conf has priority
-    ops.reverse.foldLeft(UserConfig.empty) { (conf, op) =>
-      conf.withConfig(op.conf)
-    }
-  }
-}
-
-trait OpEdge
-
-/**
- * The upstream OP and downstream OP doesn't require network data shuffle.
- *
- * For example, map, flatmap operation doesn't require network shuffle, we can use Direct
- * to represent the relation with upstream operators.
- */
-case object Direct extends OpEdge
-
-/**
- * The upstream OP and downstream OP DOES require network data shuffle.
- *
- * For example, map, flatmap operation doesn't require network shuffle, we can use Direct
- * to represent the relation with upstream operators.
- */
-case object Shuffle extends OpEdge
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
new file mode 100644
index 0000000..2ec881b
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.partitioner
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.partitioner.UnicastPartitioner
+import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
+
+/**
+ * Partition messages by applying group by function first.
+ *
+ * For example:
+ * {{{
+ * case class People(name: String, gender: String)
+ *
+ * object Test{
+ *
+ *   val groupBy: (People => String) = people => people.gender
+ *   val partitioner = GroupByPartitioner(groupBy)
+ * }
+ * }}}
+ *
+ * @param fn First apply message with groupBy function, then pick the hashCode of the output
+ *   to do the partitioning. You must define hashCode() for output type of groupBy function.
+ */
+class GroupByPartitioner[T, Group](fn: GroupByFn[T, Group])
+  extends UnicastPartitioner {
+  override def getPartition(message: Message, partitionNum: Int, currentPartitionId: Int): Int = {
+    val hashCode = fn.groupBy(message).hashCode()
+    (hashCode & Integer.MAX_VALUE) % partitionNum
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala
deleted file mode 100644
index b2e2932..0000000
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.dsl.partitioner
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.partitioner.UnicastPartitioner
-
-/**
- * Partition messages by applying group by function first.
- *
- * For example:
- * {{{
- * case class People(name: String, gender: String)
- *
- * object Test{
- *
- *   val groupBy: (People => String) = people => people.gender
- *   val partitioner = GroupByPartitioner(groupBy)
- * }
- * }}}
- *
- * @param groupBy First apply message with groupBy function, then pick the hashCode of the output
- *   to do the partitioning. You must define hashCode() for output type of groupBy function.
- */
-class GroupByPartitioner[T, GROUP](groupBy: T => GROUP = null) extends UnicastPartitioner {
-  override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
-    val hashCode = groupBy(msg.msg.asInstanceOf[T]).hashCode()
-    (hashCode & Integer.MAX_VALUE) % partitionNum
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
new file mode 100644
index 0000000..744976b
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.plan
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.Processor.DefaultProcessor
+import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
+import org.apache.gearpump.streaming.{Constants, Processor}
+import org.apache.gearpump.streaming.dsl.task.TransformTask
+import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
+import org.apache.gearpump.streaming.sink.{DataSink, DataSinkProcessor}
+import org.apache.gearpump.streaming.source.{DataSource, DataSourceTask}
+import org.apache.gearpump.streaming.task.Task
+
+import scala.reflect.ClassTag
+
+/**
+ * This is a vertex on the logical plan.
+ */
+sealed trait Op {
+
+  def description: String
+
+  def userConfig: UserConfig
+
+  def chain(op: Op)(implicit system: ActorSystem): Op
+
+  def getProcessor(implicit system: ActorSystem): Processor[_ <: Task]
+}
+
+/**
+ * This represents a low level Processor.
+ */
+case class ProcessorOp[T <: Task](
+    processor: Class[T],
+    parallelism: Int,
+    userConfig: UserConfig,
+    description: String)
+  extends Op {
+
+  def this(
+      parallelism: Int = 1,
+      userConfig: UserConfig = UserConfig.empty,
+      description: String = "processor")(implicit classTag: ClassTag[T]) = {
+    this(classTag.runtimeClass.asInstanceOf[Class[T]], parallelism, userConfig, description)
+  }
+
+  override def chain(other: Op)(implicit system: ActorSystem): Op = {
+    throw new OpChainException(this, other)
+  }
+
+  override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+    DefaultProcessor(parallelism, description, userConfig, processor)
+  }
+}
+
+/**
+ * This represents a DataSource.
+ */
+case class DataSourceOp(
+    dataSource: DataSource,
+    parallelism: Int = 1,
+    userConfig: UserConfig = UserConfig.empty,
+    description: String = "source")
+  extends Op {
+
+  override def chain(other: Op)(implicit system: ActorSystem): Op = {
+    other match {
+      case op: ChainableOp[_, _] =>
+        DataSourceOp(dataSource, parallelism,
+          userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.fn),
+          description)
+      case _ =>
+        throw new OpChainException(this, other)
+    }
+  }
+
+  override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+    Processor[DataSourceTask[Any, Any]](parallelism, description,
+      userConfig.withValue(GEARPUMP_STREAMING_SOURCE, dataSource))
+  }
+}
+
+/**
+ * This represents a DataSink.
+ */
+case class DataSinkOp(
+    dataSink: DataSink,
+    parallelism: Int = 1,
+    userConfig: UserConfig = UserConfig.empty,
+    description: String = "sink")
+  extends Op {
+
+  override def chain(op: Op)(implicit system: ActorSystem): Op = {
+    throw new OpChainException(this, op)
+  }
+
+  override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+    DataSinkProcessor(dataSink, parallelism, description)
+  }
+}
+
+/**
+ * This represents operations that can be chained together
+ * (e.g. flatMap, map, filter, reduce) and further chained
+ * to another Op to be used
+ */
+case class ChainableOp[IN, OUT](
+    fn: SingleInputFunction[IN, OUT]) extends Op {
+
+  override def description: String = fn.description
+
+  override def userConfig: UserConfig = UserConfig.empty
+
+  override def chain(other: Op)(implicit system: ActorSystem): Op = {
+    other match {
+      case op: ChainableOp[OUT, _] =>
+        // TODO: preserve type info
+        ChainableOp(fn.andThen(op.fn))
+      case _ =>
+        throw new OpChainException(this, other)
+    }
+  }
+
+  override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+    throw new UnsupportedOperationException("ChainedOp cannot be translated to Processor")
+  }
+}
+
+/**
+ * This represents a Processor with window aggregation
+ */
+case class GroupByOp[IN, GROUP](
+    groupByFn: GroupByFn[IN, GROUP],
+    parallelism: Int = 1,
+    description: String = "groupBy",
+    override val userConfig: UserConfig = UserConfig.empty)
+  extends Op {
+
+  override def chain(other: Op)(implicit system: ActorSystem): Op = {
+    other match {
+      case op: ChainableOp[_, _] =>
+        GroupByOp(groupByFn, parallelism, description,
+          userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.fn))
+      case _ =>
+        throw new OpChainException(this, other)
+    }
+  }
+
+  override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+    groupByFn.getProcessor(parallelism, description, userConfig)
+  }
+}
+
+/**
+ * This represents a Processor transforming merged streams
+ */
+case class MergeOp(description: String, userConfig: UserConfig = UserConfig.empty)
+  extends Op {
+
+  override def chain(other: Op)(implicit system: ActorSystem): Op = {
+    other match {
+      case op: ChainableOp[_, _] =>
+        MergeOp(description, userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.fn))
+      case _ =>
+        throw new OpChainException(this, other)
+    }
+  }
+
+  override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+    Processor[TransformTask[Any, Any]](1, description, userConfig)
+  }
+
+}
+
+/**
+ * This is an edge on the logical plan.
+ */
+trait OpEdge
+
+/**
+ * The upstream OP and downstream OP doesn't require network data shuffle.
+ * e.g. ChainableOp
+ */
+case object Direct extends OpEdge
+
+/**
+ * The upstream OP and downstream OP DOES require network data shuffle.
+ * e.g. GroupByOp
+ */
+case object Shuffle extends OpEdge
+
+/**
+ * Runtime exception thrown on chaining.
+ */
+class OpChainException(op1: Op, op2: Op) extends RuntimeException(s"$op1 cannot be chained by $op2")
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
deleted file mode 100644
index b09d9b9..0000000
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.dsl.plan
-
-import scala.collection.TraversableOnce
-import akka.actor.ActorSystem
-import org.slf4j.Logger
-import org.apache.gearpump._
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.Processor
-import org.apache.gearpump.streaming.Processor.DefaultProcessor
-import org.apache.gearpump.streaming.dsl.op._
-import org.apache.gearpump.streaming.dsl.plan.OpTranslator._
-import org.apache.gearpump.streaming.sink.DataSinkProcessor
-import org.apache.gearpump.streaming.source.DataSourceTask
-import org.apache.gearpump.streaming.task.{Task, TaskContext}
-import org.apache.gearpump.util.LogUtil
-
-/**
- * Translates a OP to a TaskDescription
- */
-class OpTranslator extends java.io.Serializable {
-  val LOG: Logger = LogUtil.getLogger(getClass)
-
-  def translate(ops: OpChain)(implicit system: ActorSystem): Processor[_ <: Task] = {
-
-    val baseConfig = ops.conf
-
-    ops.ops.head match {
-      case op: MasterOp =>
-        val tail = ops.ops.tail
-        val func = toFunction(tail)
-        val userConfig = baseConfig.withValue(GEARPUMP_STREAMING_OPERATOR, func)
-
-        op match {
-          case DataSourceOp(dataSource, parallelism, conf, description) =>
-            Processor[DataSourceTask[Any, Any]](parallelism,
-              description = description + "." + func.description,
-              userConfig.withValue(GEARPUMP_STREAMING_SOURCE, dataSource))
-          case groupby@GroupByOp(_, parallelism, description, _) =>
-            Processor[GroupByTask[Object, Object, Object]](parallelism,
-              description = description + "." + func.description,
-              userConfig.withValue(GEARPUMP_STREAMING_GROUPBY_FUNCTION, groupby))
-          case merge: MergeOp =>
-            Processor[TransformTask[Object, Object]](1,
-              description = op.description + "." + func.description,
-              userConfig)
-          case ProcessorOp(processor, parallelism, conf, description) =>
-            DefaultProcessor(parallelism,
-              description = description + " " + func.description,
-              userConfig, processor)
-          case DataSinkOp(dataSink, parallelism, conf, description) =>
-            DataSinkProcessor(dataSink, parallelism, description + func.description)
-        }
-      case op: SlaveOp[_] =>
-        val func = toFunction(ops.ops)
-        val userConfig = baseConfig.withValue(GEARPUMP_STREAMING_OPERATOR, func)
-
-        Processor[TransformTask[Object, Object]](1,
-          description = func.description,
-          taskConf = userConfig)
-      case chain: OpChain =>
-        throw new RuntimeException("Not supposed to be called!")
-    }
-  }
-
-  private def toFunction(ops: List[Op]): SingleInputFunction[Object, Object] = {
-    val func: SingleInputFunction[Object, Object] = new DummyInputFunction[Object]()
-    val totalFunction = ops.foldLeft(func) { (fun, op) =>
-
-      val opFunction = op match {
-        case flatmap: FlatMapOp[Object @unchecked, Object @unchecked] =>
-          new FlatMapFunction(flatmap.fun, flatmap.description)
-        case reduce: ReduceOp[Object @unchecked] =>
-          new ReduceFunction(reduce.fun, reduce.description)
-        case _ =>
-          throw new RuntimeException("Not supposed to be called!")
-      }
-      fun.andThen(opFunction.asInstanceOf[SingleInputFunction[Object, Object]])
-    }
-    totalFunction.asInstanceOf[SingleInputFunction[Object, Object]]
-  }
-}
-
-object OpTranslator {
-
-  trait SingleInputFunction[IN, OUT] extends Serializable {
-    def process(value: IN): TraversableOnce[OUT]
-    def andThen[OUTER](other: SingleInputFunction[OUT, OUTER]): SingleInputFunction[IN, OUTER] = {
-      new AndThen(this, other)
-    }
-
-    def description: String
-  }
-
-  class DummyInputFunction[T] extends SingleInputFunction[T, T] {
-    override def andThen[OUTER](other: SingleInputFunction[T, OUTER])
-    : SingleInputFunction[T, OUTER] = {
-      other
-    }
-
-    // Should never be called
-    override def process(value: T): TraversableOnce[T] = None
-
-    override def description: String = ""
-  }
-
-  class AndThen[IN, MIDDLE, OUT](
-      first: SingleInputFunction[IN, MIDDLE], second: SingleInputFunction[MIDDLE, OUT])
-    extends SingleInputFunction[IN, OUT] {
-
-    override def process(value: IN): TraversableOnce[OUT] = {
-      first.process(value).flatMap(second.process)
-    }
-
-    override def description: String = {
-      Option(first.description).flatMap { description =>
-        Option(second.description).map(description + "." + _)
-      }.orNull
-    }
-  }
-
-  class FlatMapFunction[IN, OUT](fun: IN => TraversableOnce[OUT], descriptionMessage: String)
-    extends SingleInputFunction[IN, OUT] {
-
-    override def process(value: IN): TraversableOnce[OUT] = {
-      fun(value)
-    }
-
-    override def description: String = {
-      this.descriptionMessage
-    }
-  }
-
-  class ReduceFunction[T](fun: (T, T) => T, descriptionMessage: String)
-    extends SingleInputFunction[T, T] {
-
-    private var state: Any = _
-
-    override def process(value: T): TraversableOnce[T] = {
-      if (state == null) {
-        state = value
-      } else {
-        state = fun(state.asInstanceOf[T], value)
-      }
-      Some(state.asInstanceOf[T])
-    }
-
-    override def description: String = descriptionMessage
-  }
-
-  class GroupByTask[IN, GROUP, OUT](
-      groupBy: IN => GROUP, taskContext: TaskContext, userConf: UserConfig)
-    extends Task(taskContext, userConf) {
-
-    def this(taskContext: TaskContext, userConf: UserConfig) = {
-      this(userConf.getValue[GroupByOp[IN, GROUP]](
-        GEARPUMP_STREAMING_GROUPBY_FUNCTION )(taskContext.system).get.fun,
-        taskContext, userConf)
-    }
-
-    private var groups = Map.empty[GROUP, SingleInputFunction[IN, OUT]]
-
-    override def onNext(msg: Message): Unit = {
-      val time = msg.timestamp
-
-      val group = groupBy(msg.msg.asInstanceOf[IN])
-      if (!groups.contains(group)) {
-        val operator =
-          userConf.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get
-        groups += group -> operator
-      }
-
-      val operator = groups(group)
-
-      operator.process(msg.msg.asInstanceOf[IN]).foreach { msg =>
-        taskContext.output(new Message(msg.asInstanceOf[AnyRef], time))
-      }
-    }
-  }
-
-  class TransformTask[IN, OUT](
-      operator: Option[SingleInputFunction[IN, OUT]], taskContext: TaskContext,
-      userConf: UserConfig) extends Task(taskContext, userConf) {
-
-    def this(taskContext: TaskContext, userConf: UserConfig) = {
-      this(userConf.getValue[SingleInputFunction[IN, OUT]](
-        GEARPUMP_STREAMING_OPERATOR)(taskContext.system), taskContext, userConf)
-    }
-
-    override def onNext(msg: Message): Unit = {
-      val time = msg.timestamp
-
-      operator match {
-        case Some(op) =>
-          op.process(msg.msg.asInstanceOf[IN]).foreach { msg =>
-            taskContext.output(new Message(msg.asInstanceOf[AnyRef], time))
-          }
-        case None =>
-          taskContext.output(new Message(msg.msg, time))
-      }
-    }
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
index f5bbd65..16d5c06 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
@@ -22,7 +22,6 @@ import akka.actor.ActorSystem
 
 import org.apache.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner, Partitioner}
 import org.apache.gearpump.streaming.Processor
-import org.apache.gearpump.streaming.dsl.op._
 import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
 import org.apache.gearpump.streaming.task.Task
 import org.apache.gearpump.util.Graph
@@ -33,64 +32,60 @@ class Planner {
    * Converts Dag of Op to Dag of TaskDescription. TaskDescription is part of the low
    * level Graph API.
    */
-  def plan(dag: Graph[Op, OpEdge])(implicit system: ActorSystem)
-    : Graph[Processor[_ <: Task], _ <: Partitioner] = {
+  def plan(dag: Graph[Op, OpEdge])
+    (implicit system: ActorSystem): Graph[Processor[_ <: Task], _ <: Partitioner] = {
 
-    val opTranslator = new OpTranslator()
-
-    val newDag = optimize(dag)
-    newDag.mapEdge { (node1, edge, node2) =>
+    val graph = optimize(dag)
+    graph.mapEdge { (node1, edge, node2) =>
       edge match {
         case Shuffle =>
-          node2.head match {
-            case groupBy: GroupByOp[Any @unchecked, Any @unchecked] =>
-              new GroupByPartitioner(groupBy.fun)
+          node2 match {
+            case groupBy: GroupByOp[_, _] =>
+              new GroupByPartitioner(groupBy.groupByFn)
             case _ => new HashPartitioner
           }
         case Direct =>
           new CoLocationPartitioner
       }
-    }.mapVertex { opChain =>
-      opTranslator.translate(opChain)
-    }
+    }.mapVertex(_.getProcessor)
   }
 
-  private def optimize(dag: Graph[Op, OpEdge]): Graph[OpChain, OpEdge] = {
-    val newGraph = dag.mapVertex(op => OpChain(List(op)))
-
-    val nodes = newGraph.topologicalOrderWithCirclesIterator.toList.reverse
+  private def optimize(dag: Graph[Op, OpEdge])
+    (implicit system: ActorSystem): Graph[Op, OpEdge] = {
+    val graph = dag.copy
+    val nodes = graph.topologicalOrderWithCirclesIterator.toList.reverse
     for (node <- nodes) {
-      val outGoingEdges = newGraph.outgoingEdgesOf(node)
+      val outGoingEdges = graph.outgoingEdgesOf(node)
       for (edge <- outGoingEdges) {
-        merge(newGraph, edge._1, edge._3)
+        merge(graph, edge._1, edge._3)
       }
     }
-    newGraph
+    graph
   }
 
-  private def merge(dag: Graph[OpChain, OpEdge], node1: OpChain, node2: OpChain)
-    : Graph[OpChain, OpEdge] = {
-    if (dag.outDegreeOf(node1) == 1 &&
-      dag.inDegreeOf(node2) == 1 &&
+  private def merge(graph: Graph[Op, OpEdge], node1: Op, node2: Op)
+    (implicit system: ActorSystem): Unit = {
+    if (graph.outDegreeOf(node1) == 1 &&
+      graph.inDegreeOf(node2) == 1 &&
       // For processor node, we don't allow it to merge with downstream operators
-      !node1.head.isInstanceOf[ProcessorOp[_ <: Task]]) {
-      val (_, edge, _) = dag.outgoingEdgesOf(node1).head
+      !node1.isInstanceOf[ProcessorOp[_ <: Task]] &&
+      !node2.isInstanceOf[ProcessorOp[_ <: Task]]) {
+      val (_, edge, _) = graph.outgoingEdgesOf(node1).head
       if (edge == Direct) {
-        val opList = OpChain(node1.ops ++ node2.ops)
-        dag.addVertex(opList)
-        for (incomingEdge <- dag.incomingEdgesOf(node1)) {
-          dag.addEdge(incomingEdge._1, incomingEdge._2, opList)
+        val chainedOp = node1.chain(node2)
+        graph.addVertex(chainedOp)
+        for (incomingEdge <- graph.incomingEdgesOf(node1)) {
+          graph.addEdge(incomingEdge._1, incomingEdge._2, chainedOp)
         }
 
-        for (outgoingEdge <- dag.outgoingEdgesOf(node2)) {
-          dag.addEdge(opList, outgoingEdge._2, outgoingEdge._3)
+        for (outgoingEdge <- graph.outgoingEdgesOf(node2)) {
+          graph.addEdge(chainedOp, outgoingEdge._2, outgoingEdge._3)
         }
 
         // Remove the old vertex
-        dag.removeVertex(node1)
-        dag.removeVertex(node2)
+        graph.removeVertex(node1)
+        graph.removeVertex(node2)
       }
     }
-    dag
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
new file mode 100644
index 0000000..609fbb0
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.plan.functions
+
+trait SingleInputFunction[IN, OUT] extends Serializable {
+  def process(value: IN): TraversableOnce[OUT]
+  def andThen[OUTER](other: SingleInputFunction[OUT, OUTER]): SingleInputFunction[IN, OUTER] = {
+    new AndThen(this, other)
+  }
+  def finish(): TraversableOnce[OUT] = None
+  def clearState(): Unit = {}
+  def description: String
+}
+
+class AndThen[IN, MIDDLE, OUT](
+    first: SingleInputFunction[IN, MIDDLE], second: SingleInputFunction[MIDDLE, OUT])
+  extends SingleInputFunction[IN, OUT] {
+
+  override def process(value: IN): TraversableOnce[OUT] = {
+    first.process(value).flatMap(second.process)
+  }
+
+  override def finish(): TraversableOnce[OUT] = {
+    val firstResult = first.finish().flatMap(second.process)
+    if (firstResult.isEmpty) {
+      second.finish()
+    } else {
+      firstResult
+    }
+  }
+
+  override def clearState(): Unit = {
+    first.clearState()
+    second.clearState()
+  }
+
+  override def description: String = {
+    Option(first.description).flatMap { description =>
+      Option(second.description).map(description + "." + _)
+    }.orNull
+  }
+}
+
+class FlatMapFunction[IN, OUT](fn: IN => TraversableOnce[OUT], descriptionMessage: String)
+  extends SingleInputFunction[IN, OUT] {
+
+  override def process(value: IN): TraversableOnce[OUT] = {
+    fn(value)
+  }
+
+  override def description: String = descriptionMessage
+}
+
+
+class ReduceFunction[T](fn: (T, T) => T, descriptionMessage: String)
+  extends SingleInputFunction[T, T] {
+
+  private var state: Option[T] = None
+
+  override def process(value: T): TraversableOnce[T] = {
+    if (state.isEmpty) {
+      state = Option(value)
+    } else {
+      state = state.map(fn(_, value))
+    }
+    None
+  }
+
+  override def finish(): TraversableOnce[T] = {
+    state
+  }
+
+  override def clearState(): Unit = {
+    state = None
+  }
+
+  override def description: String = descriptionMessage
+}
+
+class EmitFunction[T](emit: T => Unit) extends SingleInputFunction[T, Unit] {
+
+  override def process(value: T): TraversableOnce[Unit] = {
+    emit(value)
+    None
+  }
+
+  override def andThen[R](other: SingleInputFunction[Unit, R]): SingleInputFunction[T, R] = {
+    throw new UnsupportedOperationException("andThen is not supposed to be called on EmitFunction")
+  }
+
+  override def description: String = ""
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
new file mode 100644
index 0000000..4ee2fa8
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.task
+
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.dsl.window.api.CountWindowFn
+import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, GroupAlsoByWindow, WindowRunner}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+
+/**
+ * This task triggers output on number of messages in a window.
+ */
+class CountTriggerTask[IN, GROUP](
+    groupBy: GroupAlsoByWindow[IN, GROUP],
+    windowRunner: WindowRunner,
+    taskContext: TaskContext,
+    userConfig: UserConfig)
+  extends Task(taskContext, userConfig) {
+
+  def this(groupBy: GroupAlsoByWindow[IN, GROUP],
+      taskContext: TaskContext, userConfig: UserConfig) = {
+    this(groupBy, new DefaultWindowRunner(taskContext, userConfig, groupBy)(taskContext.system),
+      taskContext, userConfig)
+  }
+
+  def this(taskContext: TaskContext, userConfig: UserConfig) = {
+    this(userConfig.getValue[GroupAlsoByWindow[IN, GROUP]](
+      GEARPUMP_STREAMING_GROUPBY_FUNCTION)(taskContext.system).get,
+      taskContext, userConfig)
+  }
+
+  private val windowSize = groupBy.window.windowFn.asInstanceOf[CountWindowFn].size
+  private var num = 0
+
+  override def onNext(msg: Message): Unit = {
+    windowRunner.process(msg)
+    num += 1
+    if (windowSize == num) {
+      windowRunner.trigger(Instant.ofEpochMilli(windowSize))
+      num = 0
+    }
+  }
+}