You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:43 UTC

[33/49] incubator-gearpump git commit: fix GEARPUMP-118 change package name to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankWorker.scala
----------------------------------------------------------------------
diff --git a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankWorker.scala b/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankWorker.scala
deleted file mode 100644
index 9f38cd7..0000000
--- a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankWorker.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.experiments.pagerank
-
-import akka.actor.Actor.Receive
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.experiments.pagerank.PageRankApplication.NodeWithTaskId
-import io.gearpump.experiments.pagerank.PageRankController.Tick
-import io.gearpump.experiments.pagerank.PageRankWorker.{LatestWeight, UpdateWeight}
-import io.gearpump.streaming.task.{Task, TaskContext, TaskId, TaskWrapper}
-import io.gearpump.util.Graph
-
-class PageRankWorker(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
-  import taskContext.taskId
-
-  private var weight: Double = 1.0
-  private var upstreamWeights = Map.empty[TaskId, Double]
-
-  val taskCount = conf.getInt(PageRankApplication.COUNT).get
-  lazy val allTasks = (0 until taskCount).toList.map(TaskId(processorId = 1, _))
-
-  private val graph = conf.getValue[Graph[NodeWithTaskId[_], AnyRef]](PageRankApplication.DAG).get
-
-  private val node = graph.vertices.find { node =>
-    node.taskId == taskContext.taskId.index
-  }.get
-
-  private val downstream = graph.outgoingEdgesOf(node).map(_._3)
-    .map(id => taskId.copy(index = id.taskId)).toSeq
-  private val upstreamCount = graph.incomingEdgesOf(node).map(_._1).length
-
-  LOG.info(s"downstream nodes: $downstream")
-
-  private var tick = 0
-
-  private def output(msg: AnyRef, tasks: TaskId*): Unit = {
-    taskContext.asInstanceOf[TaskWrapper].outputUnManaged(msg, tasks: _*)
-  }
-
-  override def receiveUnManagedMessage: Receive = {
-    case Tick(tick) =>
-      this.tick = tick
-
-      if (downstream.length == 0) {
-        // If there is no downstream, we will evenly distribute our page rank to
-        // every node in the graph
-        val update = UpdateWeight(taskId, weight / taskCount)
-        output(update, allTasks: _*)
-      } else {
-        val update = UpdateWeight(taskId, weight / downstream.length)
-        output(update, downstream: _*)
-      }
-    case update@UpdateWeight(upstreamTaskId, weight) =>
-      upstreamWeights += upstreamTaskId -> weight
-      if (upstreamWeights.size == upstreamCount) {
-        val nextWeight = upstreamWeights.foldLeft(0.0) { (sum, item) => sum + item._2 }
-        this.upstreamWeights = Map.empty[TaskId, Double]
-        this.weight = nextWeight
-        output(LatestWeight(taskId, weight, tick), TaskId(0, 0))
-      }
-  }
-}
-
-object PageRankWorker {
-  case class UpdateWeight(taskId: TaskId, weight: Double)
-  case class LatestWeight(taskId: TaskId, weight: Double, tick: Int)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/example/PageRankExample.scala
----------------------------------------------------------------------
diff --git a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/example/PageRankExample.scala b/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/example/PageRankExample.scala
deleted file mode 100644
index 3877974..0000000
--- a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/example/PageRankExample.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.experiments.pagerank.example
-
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.experiments.pagerank.PageRankApplication
-import io.gearpump.util.Graph.Node
-import io.gearpump.util.{AkkaApp, Graph}
-
-/** A very simple PageRank example, Cyclic graph is not supported */
-object PageRankExample extends AkkaApp {
-
-  val a = "a"
-  val b = "b"
-  val c = "c"
-  val d = "d"
-
-  def help(): Unit = Unit
-
-  def main(akkaConf: Config, args: Array[String]): Unit = {
-    val pageRankGraph = Graph(a ~> b, a ~> c, a ~> d, b ~> a, b ~> d, d ~> b, d ~> c, c ~> b)
-    val app = new PageRankApplication("pagerank", iteration = 100, delta = 0.001, pageRankGraph)
-    val context = ClientContext(akkaConf)
-    val appId = context.submit(app)
-    context.close()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankApplication.scala
----------------------------------------------------------------------
diff --git a/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankApplication.scala b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankApplication.scala
new file mode 100644
index 0000000..023ee35
--- /dev/null
+++ b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankApplication.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.experiments.pagerank
+
+import akka.actor.ActorSystem
+
+import org.apache.gearpump.cluster.{Application, ApplicationMaster, UserConfig}
+import org.apache.gearpump.experiments.pagerank.PageRankApplication.NodeWithTaskId
+import org.apache.gearpump.partitioner.HashPartitioner
+import org.apache.gearpump.streaming.appmaster.AppMaster
+import org.apache.gearpump.streaming.{Processor, StreamApplication}
+import org.apache.gearpump.util.Graph
+import org.apache.gearpump.util.Graph.Node
+
+/**
+ *
+ * A simple and naive pagerank implementation.
+ *
+ * @param name name of the application
+ * @param iteration max iteration count
+ * @param delta decide the accuracy when the page rank example stops.
+ * @param dag  the page rank graph
+ */
+class PageRankApplication[T](
+    override val name: String, iteration: Int, delta: Double, dag: Graph[T, _])
+  extends Application {
+
+  override def appMaster: Class[_ <: ApplicationMaster] = classOf[AppMaster]
+  override def userConfig(implicit system: ActorSystem): UserConfig = {
+
+    // Map node with taskId
+    var taskId = 0
+    val pageRankDag = dag.mapVertex { node =>
+      val updatedNode = NodeWithTaskId(taskId, node)
+      taskId += 1
+      updatedNode
+    }
+
+    val taskCount = taskId
+
+    val userConfig = UserConfig.empty.withValue(PageRankApplication.DAG, pageRankDag).
+      withInt(PageRankApplication.ITERATION, iteration).
+      withInt(PageRankApplication.COUNT, taskCount).
+      withDouble(PageRankApplication.DELTA, delta)
+
+    val controller = Processor[PageRankController](1)
+    val pageRankWorker = Processor[PageRankWorker](taskCount)
+    val partitioner = new HashPartitioner
+
+    val app = StreamApplication(name, Graph(controller ~ partitioner ~> pageRankWorker), userConfig)
+    app.userConfig
+  }
+}
+
+object PageRankApplication {
+  val DAG = "PageRank.DAG"
+  val ITERATION = "PageRank.Iteration"
+  val COUNT = "PageRank.COUNT"
+  val DELTA = "PageRank.DELTA"
+  val REPORTER = "PageRank.Reporter"
+  case class NodeWithTaskId[T](taskId: Int, node: T)
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankController.scala
----------------------------------------------------------------------
diff --git a/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankController.scala b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankController.scala
new file mode 100644
index 0000000..d461876
--- /dev/null
+++ b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankController.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.experiments.pagerank
+
+import akka.actor.Actor.Receive
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.experiments.pagerank.PageRankController.Tick
+import org.apache.gearpump.experiments.pagerank.PageRankWorker.LatestWeight
+import org.apache.gearpump.streaming.task._
+
+class PageRankController(taskContext: TaskContext, conf: UserConfig)
+  extends Task(taskContext, conf) {
+
+  val taskCount = conf.getInt(PageRankApplication.COUNT).get
+  val iterationMax = conf.getInt(PageRankApplication.ITERATION).get
+  val delta = conf.getDouble(PageRankApplication.DELTA).get
+
+  val tasks = (0 until taskCount).toList.map(TaskId(1, _))
+
+  var tick: Int = 0
+  var receivedWeightForCurrentTick = 0
+
+  var weights = Map.empty[TaskId, Double]
+  var deltas = Map.empty[TaskId, Double]
+
+  override def onStart(startTime: StartTime): Unit = {
+    output(Tick(tick), tasks: _*)
+  }
+
+  private def output(msg: AnyRef, tasks: TaskId*): Unit = {
+    taskContext.asInstanceOf[TaskWrapper].outputUnManaged(msg, tasks: _*)
+  }
+
+  override def receiveUnManagedMessage: Receive = {
+    case LatestWeight(taskId, weight, replyTick) =>
+      if (this.tick == replyTick) {
+
+        deltas += taskId -> Math.abs(weight - weights.getOrElse(taskId, 0.0))
+        weights += taskId -> weight
+        receivedWeightForCurrentTick += 1
+        if (receivedWeightForCurrentTick == taskCount) {
+          this.tick += 1
+          receivedWeightForCurrentTick = 0
+          if (continueIteration) {
+            LOG.debug(s"next iteration: $tick, weight: $weights, delta: $deltas")
+            output(Tick(tick), tasks: _*)
+          } else {
+            LOG.info(s"iterations: $tick, weight: $weights, delta: $deltas")
+          }
+        }
+      }
+  }
+
+  private def continueIteration: Boolean = {
+    (tick < iterationMax) && deltas.values.foldLeft(false) { (deltaExceed, value) =>
+      deltaExceed || value > delta
+    }
+  }
+}
+
+object PageRankController {
+  case class Tick(iteration: Int)
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankWorker.scala
----------------------------------------------------------------------
diff --git a/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankWorker.scala b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankWorker.scala
new file mode 100644
index 0000000..e033bf1
--- /dev/null
+++ b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankWorker.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.experiments.pagerank
+
+import akka.actor.Actor.Receive
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.experiments.pagerank.PageRankApplication.NodeWithTaskId
+import org.apache.gearpump.experiments.pagerank.PageRankController.Tick
+import org.apache.gearpump.experiments.pagerank.PageRankWorker.{LatestWeight, UpdateWeight}
+import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskId, TaskWrapper}
+import org.apache.gearpump.util.Graph
+
+class PageRankWorker(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
+  import taskContext.taskId
+
+  private var weight: Double = 1.0
+  private var upstreamWeights = Map.empty[TaskId, Double]
+
+  val taskCount = conf.getInt(PageRankApplication.COUNT).get
+  lazy val allTasks = (0 until taskCount).toList.map(TaskId(processorId = 1, _))
+
+  private val graph = conf.getValue[Graph[NodeWithTaskId[_], AnyRef]](PageRankApplication.DAG).get
+
+  private val node = graph.vertices.find { node =>
+    node.taskId == taskContext.taskId.index
+  }.get
+
+  private val downstream = graph.outgoingEdgesOf(node).map(_._3)
+    .map(id => taskId.copy(index = id.taskId)).toSeq
+  private val upstreamCount = graph.incomingEdgesOf(node).map(_._1).length
+
+  LOG.info(s"downstream nodes: $downstream")
+
+  private var tick = 0
+
+  private def output(msg: AnyRef, tasks: TaskId*): Unit = {
+    taskContext.asInstanceOf[TaskWrapper].outputUnManaged(msg, tasks: _*)
+  }
+
+  override def receiveUnManagedMessage: Receive = {
+    case Tick(tick) =>
+      this.tick = tick
+
+      if (downstream.length == 0) {
+        // If there is no downstream, we will evenly distribute our page rank to
+        // every node in the graph
+        val update = UpdateWeight(taskId, weight / taskCount)
+        output(update, allTasks: _*)
+      } else {
+        val update = UpdateWeight(taskId, weight / downstream.length)
+        output(update, downstream: _*)
+      }
+    case update@UpdateWeight(upstreamTaskId, weight) =>
+      upstreamWeights += upstreamTaskId -> weight
+      if (upstreamWeights.size == upstreamCount) {
+        val nextWeight = upstreamWeights.foldLeft(0.0) { (sum, item) => sum + item._2 }
+        this.upstreamWeights = Map.empty[TaskId, Double]
+        this.weight = nextWeight
+        output(LatestWeight(taskId, weight, tick), TaskId(0, 0))
+      }
+  }
+}
+
+object PageRankWorker {
+  case class UpdateWeight(taskId: TaskId, weight: Double)
+  case class LatestWeight(taskId: TaskId, weight: Double, tick: Int)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/example/PageRankExample.scala
----------------------------------------------------------------------
diff --git a/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/example/PageRankExample.scala b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/example/PageRankExample.scala
new file mode 100644
index 0000000..9dc311f
--- /dev/null
+++ b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/example/PageRankExample.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.experiments.pagerank.example
+
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.experiments.pagerank.PageRankApplication
+import org.apache.gearpump.util.Graph.Node
+import org.apache.gearpump.util.{AkkaApp, Graph}
+
+/** A very simple PageRank example, Cyclic graph is not supported */
+object PageRankExample extends AkkaApp {
+
+  val a = "a"
+  val b = "b"
+  val c = "c"
+  val d = "d"
+
+  def help(): Unit = Unit
+
+  def main(akkaConf: Config, args: Array[String]): Unit = {
+    val pageRankGraph = Graph(a ~> b, a ~> c, a ~> d, b ~> a, b ~> d, d ~> b, d ~> c, c ~> b)
+    val app = new PageRankApplication("pagerank", iteration = 100, delta = 0.001, pageRankGraph)
+    val context = ClientContext(akkaConf)
+    val appId = context.submit(app)
+    context.close()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Dag.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Dag.scala b/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Dag.scala
deleted file mode 100644
index b517126..0000000
--- a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Dag.scala
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.complexdag
-
-import org.slf4j.Logger
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
-import io.gearpump.partitioner.HashPartitioner
-import io.gearpump.streaming.task.TaskContext
-import io.gearpump.streaming.{Processor, StreamApplication}
-import io.gearpump.util.Graph.{Node => GraphNode}
-import io.gearpump.util.{AkkaApp, Graph, LogUtil}
-
-case class Source_0(_context: TaskContext, _conf: UserConfig) extends Source(_context, _conf)
-case class Source_1(_context: TaskContext, _conf: UserConfig) extends Source(_context, _conf)
-case class Node_0(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf)
-case class Node_1(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf)
-case class Node_2(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf)
-case class Node_3(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf)
-case class Node_4(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf)
-case class Sink_0(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
-case class Sink_1(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
-case class Sink_2(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
-case class Sink_3(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
-case class Sink_4(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
-
-/**
- * digraph flow {
- *   Source_0 -> Sink_0;
- *   Source_0 -> Sink_1;
- *   Source_0 -> Sink_2;
- *   Source_0 -> Node_1;
- *   Source_1 -> Node_0;
- *   Node_0 -> Sink_3;
- *   Node_1 -> Sink_3;
- *   Node_1 -> Sink_4;
- *   Node_1 -> Node_4;
- *   Node_2 -> Node_3;
- *   Node_1 -> Node_3;
- *   Source_0 -> Node_2;
- *   Source_0 -> Node_3;
- *   Node_3 -> Sink_3;
- *   Node_4 -> Sink_3;
- *   Source_1 -> Sink_4;
- * }
- */
-object Dag extends AkkaApp with ArgumentsParser {
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-  val RUN_FOR_EVER = -1
-
-  override val options: Array[(String, CLIOption[Any])] = Array.empty
-
-  def application(config: ParseResult): StreamApplication = {
-
-    val source_0 = Processor[Source_0](1)
-    val source_1 = Processor[Source_1](1)
-    val node_0 = Processor[Node_0](1)
-    val node_1 = Processor[Node_1](1)
-    val node_2 = Processor[Node_2](1)
-    val node_3 = Processor[Node_3](1)
-    val node_4 = Processor[Node_4](1)
-    val sink_0 = Processor[Sink_0](1)
-    val sink_1 = Processor[Sink_1](1)
-    val sink_2 = Processor[Sink_2](1)
-    val sink_3 = Processor[Sink_3](1)
-    val sink_4 = Processor[Sink_4](1)
-    val partitioner = new HashPartitioner
-    val app = StreamApplication("dag", Graph(
-      source_0 ~ partitioner ~> sink_1,
-      source_0 ~ partitioner ~> sink_2,
-      source_0 ~ partitioner ~> node_2,
-      source_0 ~ partitioner ~> node_3,
-      source_0 ~ partitioner ~> node_1,
-      source_0 ~ partitioner ~> sink_0,
-      node_2 ~ partitioner ~> node_3,
-      node_1 ~ partitioner ~> node_3,
-      node_1 ~ partitioner ~> sink_3,
-      node_1 ~ partitioner ~> node_4,
-      source_1 ~ partitioner ~> sink_4,
-      source_1 ~ partitioner ~> node_0,
-      node_3 ~ partitioner ~> sink_3,
-      node_4 ~ partitioner ~> sink_3,
-      node_0 ~ partitioner ~> sink_3
-    ), UserConfig.empty)
-    app
-  }
-
-  override def main(akkaConf: Config, args: Array[String]): Unit = {
-    val userConf = parse(args)
-    val context = ClientContext(akkaConf)
-    val appId = context.submit(application(userConf))
-    context.close()
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Node.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Node.scala b/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Node.scala
deleted file mode 100644
index dfad6c8..0000000
--- a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Node.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.complexdag
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
-
-class Node(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
-  import taskContext.output
-
-  override def onStart(startTime: StartTime): Unit = {}
-
-  override def onNext(msg: Message): Unit = {
-    val list = msg.msg.asInstanceOf[Vector[String]]
-    output(new Message(list :+ getClass.getCanonicalName, System.currentTimeMillis()))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Sink.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Sink.scala b/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Sink.scala
deleted file mode 100644
index b091a17..0000000
--- a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Sink.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.complexdag
-
-import scala.collection.mutable
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
-
-class Sink(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
-
-  var list = mutable.MutableList[String]()
-
-  override def onStart(startTime: StartTime): Unit = {
-    list += getClass.getCanonicalName
-  }
-
-  override def onNext(msg: Message): Unit = {
-    val l = msg.msg.asInstanceOf[Vector[String]]
-    list.size match {
-      case 1 =>
-        l.foreach(f => {
-          list += f
-        })
-      case _ =>
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Source.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Source.scala b/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Source.scala
deleted file mode 100644
index df656ac..0000000
--- a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Source.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.complexdag
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
-
-class Source(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
-  import taskContext.output
-
-  override def onStart(startTime: StartTime): Unit = {
-    self ! Message("start")
-  }
-
-  override def onNext(msg: Message): Unit = {
-    val list = Vector(getClass.getCanonicalName)
-    output(new Message(list, System.currentTimeMillis))
-    self ! Message("continue", System.currentTimeMillis())
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Dag.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Dag.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Dag.scala
new file mode 100644
index 0000000..3b6ceb8
--- /dev/null
+++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Dag.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.complexdag
+
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
+import org.apache.gearpump.partitioner.HashPartitioner
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.streaming.{Processor, StreamApplication}
+import org.apache.gearpump.util.Graph.{Node => GraphNode}
+import org.apache.gearpump.util.{AkkaApp, Graph, LogUtil}
+
+case class Source_0(_context: TaskContext, _conf: UserConfig) extends Source(_context, _conf)
+case class Source_1(_context: TaskContext, _conf: UserConfig) extends Source(_context, _conf)
+case class Node_0(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf)
+case class Node_1(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf)
+case class Node_2(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf)
+case class Node_3(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf)
+case class Node_4(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf)
+case class Sink_0(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
+case class Sink_1(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
+case class Sink_2(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
+case class Sink_3(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
+case class Sink_4(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
+
+/**
+ * digraph flow {
+ *   Source_0 -> Sink_0;
+ *   Source_0 -> Sink_1;
+ *   Source_0 -> Sink_2;
+ *   Source_0 -> Node_1;
+ *   Source_1 -> Node_0;
+ *   Node_0 -> Sink_3;
+ *   Node_1 -> Sink_3;
+ *   Node_1 -> Sink_4;
+ *   Node_1 -> Node_4;
+ *   Node_2 -> Node_3;
+ *   Node_1 -> Node_3;
+ *   Source_0 -> Node_2;
+ *   Source_0 -> Node_3;
+ *   Node_3 -> Sink_3;
+ *   Node_4 -> Sink_3;
+ *   Source_1 -> Sink_4;
+ * }
+ */
+object Dag extends AkkaApp with ArgumentsParser {
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+  val RUN_FOR_EVER = -1
+
+  override val options: Array[(String, CLIOption[Any])] = Array.empty
+
+  def application(config: ParseResult): StreamApplication = {
+
+    val source_0 = Processor[Source_0](1)
+    val source_1 = Processor[Source_1](1)
+    val node_0 = Processor[Node_0](1)
+    val node_1 = Processor[Node_1](1)
+    val node_2 = Processor[Node_2](1)
+    val node_3 = Processor[Node_3](1)
+    val node_4 = Processor[Node_4](1)
+    val sink_0 = Processor[Sink_0](1)
+    val sink_1 = Processor[Sink_1](1)
+    val sink_2 = Processor[Sink_2](1)
+    val sink_3 = Processor[Sink_3](1)
+    val sink_4 = Processor[Sink_4](1)
+    val partitioner = new HashPartitioner
+    val app = StreamApplication("dag", Graph(
+      source_0 ~ partitioner ~> sink_1,
+      source_0 ~ partitioner ~> sink_2,
+      source_0 ~ partitioner ~> node_2,
+      source_0 ~ partitioner ~> node_3,
+      source_0 ~ partitioner ~> node_1,
+      source_0 ~ partitioner ~> sink_0,
+      node_2 ~ partitioner ~> node_3,
+      node_1 ~ partitioner ~> node_3,
+      node_1 ~ partitioner ~> sink_3,
+      node_1 ~ partitioner ~> node_4,
+      source_1 ~ partitioner ~> sink_4,
+      source_1 ~ partitioner ~> node_0,
+      node_3 ~ partitioner ~> sink_3,
+      node_4 ~ partitioner ~> sink_3,
+      node_0 ~ partitioner ~> sink_3
+    ), UserConfig.empty)
+    app
+  }
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val userConf = parse(args)
+    val context = ClientContext(akkaConf)
+    val appId = context.submit(application(userConf))
+    context.close()
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala
new file mode 100644
index 0000000..8d163f9
--- /dev/null
+++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.complexdag
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+
+class Node(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
+  import taskContext.output
+
+  override def onStart(startTime: StartTime): Unit = {}
+
+  override def onNext(msg: Message): Unit = {
+    val list = msg.msg.asInstanceOf[Vector[String]]
+    output(new Message(list :+ getClass.getCanonicalName, System.currentTimeMillis()))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala
new file mode 100644
index 0000000..8dfa565
--- /dev/null
+++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.complexdag
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+
+import scala.collection.mutable
+
+class Sink(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
+
+  var list = mutable.MutableList[String]()
+
+  override def onStart(startTime: StartTime): Unit = {
+    list += getClass.getCanonicalName
+  }
+
+  override def onNext(msg: Message): Unit = {
+    val l = msg.msg.asInstanceOf[Vector[String]]
+    list.size match {
+      case 1 =>
+        l.foreach(f => {
+          list += f
+        })
+      case _ =>
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala
new file mode 100644
index 0000000..0359519
--- /dev/null
+++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.complexdag
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+
+class Source(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
+  import taskContext.output
+
+  override def onStart(startTime: StartTime): Unit = {
+    self ! Message("start")
+  }
+
+  override def onNext(msg: Message): Unit = {
+    val list = Vector(getClass.getCanonicalName)
+    output(new Message(list, System.currentTimeMillis))
+    self ! Message("continue", System.currentTimeMillis())
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/DagSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/DagSpec.scala b/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/DagSpec.scala
deleted file mode 100644
index b142d8d..0000000
--- a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/DagSpec.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.complexdag
-
-import scala.concurrent.Future
-import scala.util.Success
-
-import org.scalatest._
-import org.scalatest.prop.PropertyChecks
-
-import io.gearpump.cluster.ClientToMaster.SubmitApplication
-import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
-import io.gearpump.cluster.{MasterHarness, TestUtil}
-
-class DagSpec extends PropSpec with PropertyChecks
-  with Matchers with BeforeAndAfterAll with MasterHarness {
-
-  override def beforeAll {
-    startActorSystem()
-  }
-
-  override def afterAll {
-    shutdownActorSystem()
-  }
-
-  protected override def config = TestUtil.DEFAULT_CONFIG
-
-  property("Dag should succeed to submit application with required arguments") {
-    val requiredArgs = Array.empty[String]
-
-    val masterReceiver = createMockMaster()
-    val args = requiredArgs
-
-    Future {
-      Dag.main(masterConfig, args)
-    }
-    masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
-    masterReceiver.reply(SubmitApplicationResult(Success(0)))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/NodeSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/NodeSpec.scala b/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/NodeSpec.scala
deleted file mode 100644
index 35c5824..0000000
--- a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/NodeSpec.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.complexdag
-
-import org.mockito.Mockito._
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.MockUtil._
-
-class NodeSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter {
-
-  val context = MockUtil.mockTaskContext
-
-  val node = new Node(context, UserConfig.empty)
-
-  property("Node should send a Vector[String](classOf[Node].getCanonicalName, " +
-    "classOf[Node].getCanonicalName") {
-    val list = Vector(classOf[Node].getCanonicalName)
-    val expected = Vector(classOf[Node].getCanonicalName, classOf[Node].getCanonicalName)
-    node.onNext(Message(list))
-    verify(context).output(argMatch[Message](_.msg == expected))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SinkSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SinkSpec.scala b/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SinkSpec.scala
deleted file mode 100644
index 341f6c6..0000000
--- a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SinkSpec.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.complexdag
-
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.MockUtil
-
-class SinkSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter {
-
-  val context = MockUtil.mockTaskContext
-
-  val sink = new Sink(context, UserConfig.empty)
-
-  property("Sink should send a Vector[String](classOf[Sink].getCanonicalName, " +
-    "classOf[Sink].getCanonicalName") {
-    val list = Vector(classOf[Sink].getCanonicalName)
-    val expected = Vector(classOf[Sink].getCanonicalName, classOf[Sink].getCanonicalName)
-    sink.onNext(Message(list))
-
-    (0 until sink.list.size).map(i => {
-      assert(sink.list(i).equals(expected(i)))
-    })
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SourceSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SourceSpec.scala b/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SourceSpec.scala
deleted file mode 100644
index faa7aa7..0000000
--- a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SourceSpec.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.complexdag
-
-import akka.actor.ActorSystem
-import org.mockito.Mockito._
-import org.scalatest.{Matchers, WordSpec}
-
-import io.gearpump.Message
-import io.gearpump.cluster.{TestUtil, UserConfig}
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.MockUtil._
-
-class SourceSpec extends WordSpec with Matchers {
-
-  "Source" should {
-    "Source should send a msg of Vector[String](classOf[Source].getCanonicalName)" in {
-      val system1 = ActorSystem("Source", TestUtil.DEFAULT_CONFIG)
-
-      val system2 = ActorSystem("Reporter", TestUtil.DEFAULT_CONFIG)
-
-      val context = MockUtil.mockTaskContext
-
-      val source = new Source(context, UserConfig.empty)
-      source.onNext(Message("start"))
-
-      verify(context).output(argMatch[Message](Vector(classOf[Source].getCanonicalName) == _.msg))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/DagSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/DagSpec.scala b/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/DagSpec.scala
new file mode 100644
index 0000000..cf8ae63
--- /dev/null
+++ b/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/DagSpec.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.complexdag
+
+import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication
+import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult
+import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
+import org.scalatest._
+import org.scalatest.prop.PropertyChecks
+
+import scala.concurrent.Future
+import scala.util.Success
+
+class DagSpec extends PropSpec with PropertyChecks
+  with Matchers with BeforeAndAfterAll with MasterHarness {
+
+  override def beforeAll {
+    startActorSystem()
+  }
+
+  override def afterAll {
+    shutdownActorSystem()
+  }
+
+  protected override def config = TestUtil.DEFAULT_CONFIG
+
+  property("Dag should succeed to submit application with required arguments") {
+    val requiredArgs = Array.empty[String]
+
+    val masterReceiver = createMockMaster()
+    val args = requiredArgs
+
+    Future {
+      Dag.main(masterConfig, args)
+    }
+    masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
+    masterReceiver.reply(SubmitApplicationResult(Success(0)))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/NodeSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/NodeSpec.scala b/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/NodeSpec.scala
new file mode 100644
index 0000000..241e0f6
--- /dev/null
+++ b/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/NodeSpec.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.complexdag
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.MockUtil._
+import org.mockito.Mockito._
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
+
+class NodeSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter {
+
+  val context = MockUtil.mockTaskContext
+
+  val node = new Node(context, UserConfig.empty)
+
+  property("Node should send a Vector[String](classOf[Node].getCanonicalName, " +
+    "classOf[Node].getCanonicalName") {
+    val list = Vector(classOf[Node].getCanonicalName)
+    val expected = Vector(classOf[Node].getCanonicalName, classOf[Node].getCanonicalName)
+    node.onNext(Message(list))
+    verify(context).output(argMatch[Message](_.msg == expected))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SinkSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SinkSpec.scala b/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SinkSpec.scala
new file mode 100644
index 0000000..e7bed30
--- /dev/null
+++ b/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SinkSpec.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.complexdag
+
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.MockUtil
+
+class SinkSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter {
+
+  val context = MockUtil.mockTaskContext
+
+  val sink = new Sink(context, UserConfig.empty)
+
+  property("Sink should send a Vector[String](classOf[Sink].getCanonicalName, " +
+    "classOf[Sink].getCanonicalName") {
+    val list = Vector(classOf[Sink].getCanonicalName)
+    val expected = Vector(classOf[Sink].getCanonicalName, classOf[Sink].getCanonicalName)
+    sink.onNext(Message(list))
+
+    (0 until sink.list.size).map(i => {
+      assert(sink.list(i).equals(expected(i)))
+    })
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SourceSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SourceSpec.scala b/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SourceSpec.scala
new file mode 100644
index 0000000..20cad1c
--- /dev/null
+++ b/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SourceSpec.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.complexdag
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.{TestUtil, UserConfig}
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.MockUtil._
+import org.mockito.Mockito._
+import org.scalatest.{Matchers, WordSpec}
+
+class SourceSpec extends WordSpec with Matchers {
+
+  "Source" should {
+    "Source should send a msg of Vector[String](classOf[Source].getCanonicalName)" in {
+      val system1 = ActorSystem("Source", TestUtil.DEFAULT_CONFIG)
+
+      val system2 = ActorSystem("Reporter", TestUtil.DEFAULT_CONFIG)
+
+      val context = MockUtil.mockTaskContext
+
+      val source = new Source(context, UserConfig.empty)
+      source.onNext(Message("start"))
+
+      verify(context).output(argMatch[Message](Vector(classOf[Source].getCanonicalName) == _.msg))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/README.md
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/README.md b/examples/streaming/fsio/README.md
index 06fefb6..200cdb1 100644
--- a/examples/streaming/fsio/README.md
+++ b/examples/streaming/fsio/README.md
@@ -16,7 +16,7 @@ In order to run the example:
 
   3. Submit the application:<br>
   ```bash
-  ./target/pack/bin/gear app -jar ./examples/target/$SCALA_VERSION_MAJOR/gearpump-examples-assembly-$VERSION.jar io.gearpump.streaming.examples.sol.SOL -input $INPUT_FILE_PATH -output $OUTPUT_DIRECTORY
+  ./target/pack/bin/gear app -jar ./examples/target/$SCALA_VERSION_MAJOR/gearpump-examples-assembly-$VERSION.jar org.apache.gearpump.streaming.examples.sol.SOL -input $INPUT_FILE_PATH -output $OUTPUT_DIRECTORY
   ```
   4. Stop the application:<br>
   ```bash

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/HadoopConfig.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/HadoopConfig.scala b/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/HadoopConfig.scala
deleted file mode 100644
index 3b53c9c..0000000
--- a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/HadoopConfig.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.fsio
-
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}
-import scala.language.implicitConversions
-
-import org.apache.hadoop.conf.Configuration
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.util.Constants._
-
-class HadoopConfig(config: UserConfig) {
-
-  def withHadoopConf(conf: Configuration): UserConfig = {
-    config.withBytes(HADOOP_CONF, serializeHadoopConf(conf))
-  }
-
-  def hadoopConf: Configuration = deserializeHadoopConf(config.getBytes(HADOOP_CONF).get)
-
-  private def serializeHadoopConf(conf: Configuration): Array[Byte] = {
-    val out = new ByteArrayOutputStream()
-    val dataOut = new DataOutputStream(out)
-    conf.write(dataOut)
-    dataOut.close()
-    out.toByteArray
-  }
-
-  private def deserializeHadoopConf(bytes: Array[Byte]): Configuration = {
-    val in = new ByteArrayInputStream(bytes)
-    val dataIn = new DataInputStream(in)
-    val result = new Configuration()
-    result.readFields(dataIn)
-    dataIn.close()
-    result
-  }
-}
-
-object HadoopConfig {
-  def empty: HadoopConfig = new HadoopConfig(UserConfig.empty)
-  def apply(config: UserConfig): HadoopConfig = new HadoopConfig(config)
-
-  implicit def userConfigToHadoopConfig(userConf: UserConfig): HadoopConfig = {
-    HadoopConfig(userConf)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala b/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
deleted file mode 100644
index 13fc3f9..0000000
--- a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.fsio
-
-import java.io.File
-import java.util.concurrent.TimeUnit
-import scala.concurrent.duration.FiniteDuration
-
-import akka.actor.Cancellable
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.io.SequenceFile._
-import org.apache.hadoop.io.{SequenceFile, Text}
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.examples.fsio.HadoopConfig._
-import io.gearpump.streaming.examples.fsio.SeqFileStreamProcessor._
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
-
-class SeqFileStreamProcessor(taskContext: TaskContext, config: UserConfig)
-  extends Task(taskContext, config) {
-
-  import taskContext.taskId
-
-  val outputPath = new Path(config.getString(OUTPUT_PATH).get + File.separator + taskId)
-  var writer: SequenceFile.Writer = null
-  val textClass = new Text().getClass
-  val key = new Text()
-  val value = new Text()
-  val hadoopConf = config.hadoopConf
-
-  private var msgCount: Long = 0
-  private var snapShotKVCount: Long = 0
-  private var snapShotTime: Long = 0
-  private var scheduler: Cancellable = null
-
-  override def onStart(startTime: StartTime): Unit = {
-
-    val fs = FileSystem.get(hadoopConf)
-    fs.deleteOnExit(outputPath)
-    writer = SequenceFile.createWriter(hadoopConf, Writer.file(outputPath),
-      Writer.keyClass(textClass), Writer.valueClass(textClass))
-
-    scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS),
-      new FiniteDuration(5, TimeUnit.SECONDS))(reportStatus())
-    snapShotTime = System.currentTimeMillis()
-    LOG.info("sequence file bolt initiated")
-  }
-
-  override def onNext(msg: Message): Unit = {
-    val kv = msg.msg.asInstanceOf[String].split("\\+\\+")
-    if (kv.length >= 2) {
-      key.set(kv(0))
-      value.set(kv(1))
-      writer.append(key, value)
-    }
-    msgCount += 1
-  }
-
-  override def onStop(): Unit = {
-    if (scheduler != null) {
-      scheduler.cancel()
-    }
-    writer.close()
-    LOG.info("sequence file bolt stopped")
-  }
-
-  private def reportStatus() = {
-    val current: Long = System.currentTimeMillis()
-    LOG.info(s"Task $taskId Throughput: ${
-      (msgCount - snapShotKVCount,
-        (current - snapShotTime) / 1000)
-    } (KVPairs, second)")
-    snapShotKVCount = msgCount
-    snapShotTime = current
-  }
-}
-
-object SeqFileStreamProcessor {
-  val OUTPUT_PATH = "outputpath"
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala b/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
deleted file mode 100644
index f9b0d22..0000000
--- a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.fsio
-
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.io.SequenceFile._
-import org.apache.hadoop.io.{SequenceFile, Text}
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.examples.fsio.HadoopConfig._
-import io.gearpump.streaming.examples.fsio.SeqFileStreamProducer._
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
-
-class SeqFileStreamProducer(taskContext: TaskContext, config: UserConfig)
-  extends Task(taskContext, config) {
-
-  import taskContext.output
-
-  val value = new Text()
-  val key = new Text()
-  var reader: SequenceFile.Reader = null
-  val hadoopConf = config.hadoopConf
-  val fs = FileSystem.get(hadoopConf)
-  val inputPath = new Path(config.getString(INPUT_PATH).get)
-
-  override def onStart(startTime: StartTime): Unit = {
-    reader = new SequenceFile.Reader(hadoopConf, Reader.file(inputPath))
-    self ! Start
-    LOG.info("sequence file spout initiated")
-  }
-
-  override def onNext(msg: Message): Unit = {
-    if (reader.next(key, value)) {
-      output(Message(key + "++" + value))
-    } else {
-      reader.close()
-      reader = new SequenceFile.Reader(hadoopConf, Reader.file(inputPath))
-    }
-    self ! Continue
-  }
-
-  override def onStop(): Unit = {
-    reader.close()
-  }
-}
-
-object SeqFileStreamProducer {
-  def INPUT_PATH: String = "inputpath"
-
-  val Start = Message("start")
-  val Continue = Message("continue")
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SequenceFileIO.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SequenceFileIO.scala b/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SequenceFileIO.scala
deleted file mode 100644
index 7272f2b..0000000
--- a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SequenceFileIO.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.fsio
-
-import org.apache.hadoop.conf.Configuration
-import org.slf4j.Logger
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
-import io.gearpump.partitioner.ShufflePartitioner
-import io.gearpump.streaming.examples.fsio.HadoopConfig._
-import io.gearpump.streaming.{Processor, StreamApplication}
-import io.gearpump.util.Graph._
-import io.gearpump.util.{AkkaApp, Graph, LogUtil}
-
-object SequenceFileIO extends AkkaApp with ArgumentsParser {
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    "source" -> CLIOption[Int]("<sequence file reader number>", required = false,
-      defaultValue = Some(1)),
-    "sink" -> CLIOption[Int]("<sequence file writer number>", required = false,
-      defaultValue = Some(1)),
-    "input" -> CLIOption[String]("<input file path>", required = true),
-    "output" -> CLIOption[String]("<output file directory>", required = true)
-  )
-
-  def application(config: ParseResult): StreamApplication = {
-    val spoutNum = config.getInt("source")
-    val boltNum = config.getInt("sink")
-    val input = config.getString("input")
-    val output = config.getString("output")
-    val appConfig = UserConfig.empty.withString(SeqFileStreamProducer.INPUT_PATH, input)
-      .withString(SeqFileStreamProcessor.OUTPUT_PATH, output)
-    val hadoopConfig = appConfig.withHadoopConf(new Configuration())
-    val partitioner = new ShufflePartitioner()
-    val streamProducer = Processor[SeqFileStreamProducer](spoutNum)
-    val streamProcessor = Processor[SeqFileStreamProcessor](boltNum)
-
-    val app = StreamApplication("SequenceFileIO",
-      Graph(streamProducer ~ partitioner ~> streamProcessor), hadoopConfig)
-    app
-  }
-
-  override def main(akkaConf: Config, args: Array[String]): Unit = {
-    val config = parse(args)
-    val context = ClientContext(akkaConf)
-    val appId = context.submit(application(config))
-    context.close()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/HadoopConfig.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/HadoopConfig.scala b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/HadoopConfig.scala
new file mode 100644
index 0000000..144dd78
--- /dev/null
+++ b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/HadoopConfig.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.fsio
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}
+import scala.language.implicitConversions
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.util.Constants._
+
+class HadoopConfig(config: UserConfig) {
+
+  def withHadoopConf(conf: Configuration): UserConfig = {
+    config.withBytes(HADOOP_CONF, serializeHadoopConf(conf))
+  }
+
+  def hadoopConf: Configuration = deserializeHadoopConf(config.getBytes(HADOOP_CONF).get)
+
+  private def serializeHadoopConf(conf: Configuration): Array[Byte] = {
+    val out = new ByteArrayOutputStream()
+    val dataOut = new DataOutputStream(out)
+    conf.write(dataOut)
+    dataOut.close()
+    out.toByteArray
+  }
+
+  private def deserializeHadoopConf(bytes: Array[Byte]): Configuration = {
+    val in = new ByteArrayInputStream(bytes)
+    val dataIn = new DataInputStream(in)
+    val result = new Configuration()
+    result.readFields(dataIn)
+    dataIn.close()
+    result
+  }
+}
+
+object HadoopConfig {
+  def empty: HadoopConfig = new HadoopConfig(UserConfig.empty)
+  def apply(config: UserConfig): HadoopConfig = new HadoopConfig(config)
+
+  implicit def userConfigToHadoopConfig(userConf: UserConfig): HadoopConfig = {
+    HadoopConfig(userConf)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
new file mode 100644
index 0000000..2e4a556
--- /dev/null
+++ b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.fsio
+
+import java.io.File
+import java.util.concurrent.TimeUnit
+import scala.concurrent.duration.FiniteDuration
+
+import akka.actor.Cancellable
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.io.SequenceFile._
+import org.apache.hadoop.io.{SequenceFile, Text}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.examples.fsio.HadoopConfig._
+import org.apache.gearpump.streaming.examples.fsio.SeqFileStreamProcessor._
+import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+
+class SeqFileStreamProcessor(taskContext: TaskContext, config: UserConfig)
+  extends Task(taskContext, config) {
+
+  import taskContext.taskId
+
+  val outputPath = new Path(config.getString(OUTPUT_PATH).get + File.separator + taskId)
+  var writer: SequenceFile.Writer = null
+  val textClass = new Text().getClass
+  val key = new Text()
+  val value = new Text()
+  val hadoopConf = config.hadoopConf
+
+  private var msgCount: Long = 0
+  private var snapShotKVCount: Long = 0
+  private var snapShotTime: Long = 0
+  private var scheduler: Cancellable = null
+
+  override def onStart(startTime: StartTime): Unit = {
+
+    val fs = FileSystem.get(hadoopConf)
+    fs.deleteOnExit(outputPath)
+    writer = SequenceFile.createWriter(hadoopConf, Writer.file(outputPath),
+      Writer.keyClass(textClass), Writer.valueClass(textClass))
+
+    scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS),
+      new FiniteDuration(5, TimeUnit.SECONDS))(reportStatus())
+    snapShotTime = System.currentTimeMillis()
+    LOG.info("sequence file bolt initiated")
+  }
+
+  override def onNext(msg: Message): Unit = {
+    val kv = msg.msg.asInstanceOf[String].split("\\+\\+")
+    if (kv.length >= 2) {
+      key.set(kv(0))
+      value.set(kv(1))
+      writer.append(key, value)
+    }
+    msgCount += 1
+  }
+
+  override def onStop(): Unit = {
+    if (scheduler != null) {
+      scheduler.cancel()
+    }
+    writer.close()
+    LOG.info("sequence file bolt stopped")
+  }
+
+  private def reportStatus() = {
+    val current: Long = System.currentTimeMillis()
+    LOG.info(s"Task $taskId Throughput: ${
+      (msgCount - snapShotKVCount,
+        (current - snapShotTime) / 1000)
+    } (KVPairs, second)")
+    snapShotKVCount = msgCount
+    snapShotTime = current
+  }
+}
+
+object SeqFileStreamProcessor {
+  val OUTPUT_PATH = "outputpath"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
new file mode 100644
index 0000000..02d2434
--- /dev/null
+++ b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.fsio
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.io.SequenceFile._
+import org.apache.hadoop.io.{SequenceFile, Text}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.examples.fsio.HadoopConfig._
+import org.apache.gearpump.streaming.examples.fsio.SeqFileStreamProducer._
+import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+
+class SeqFileStreamProducer(taskContext: TaskContext, config: UserConfig)
+  extends Task(taskContext, config) {
+
+  import taskContext.output
+
+  val value = new Text()
+  val key = new Text()
+  var reader: SequenceFile.Reader = null
+  val hadoopConf = config.hadoopConf
+  val fs = FileSystem.get(hadoopConf)
+  val inputPath = new Path(config.getString(INPUT_PATH).get)
+
+  override def onStart(startTime: StartTime): Unit = {
+    reader = new SequenceFile.Reader(hadoopConf, Reader.file(inputPath))
+    self ! Start
+    LOG.info("sequence file spout initiated")
+  }
+
+  override def onNext(msg: Message): Unit = {
+    if (reader.next(key, value)) {
+      output(Message(key + "++" + value))
+    } else {
+      reader.close()
+      reader = new SequenceFile.Reader(hadoopConf, Reader.file(inputPath))
+    }
+    self ! Continue
+  }
+
+  override def onStop(): Unit = {
+    reader.close()
+  }
+}
+
+object SeqFileStreamProducer {
+  def INPUT_PATH: String = "inputpath"
+
+  val Start = Message("start")
+  val Continue = Message("continue")
+}