You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/04/26 11:42:51 UTC

[41/49] incubator-gearpump git commit: GEARPUMP-11, fix code style

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala
----------------------------------------------------------------------
diff --git a/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala b/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala
index fcdbf14..5bafef1 100644
--- a/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala
+++ b/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,23 +17,25 @@
  */
 package io.gearpump.experiments.distributeservice
 
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
 import akka.actor.ActorSystem
 import akka.testkit.{TestActorRef, TestProbe}
-import io.gearpump.WorkerId
-import io.gearpump.cluster.AppMasterToMaster.{RequestResource, GetAllWorkers, RegisterAppMaster}
+import org.scalatest.{BeforeAndAfter, Matchers, WordSpec}
+
+import io.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, RegisterAppMaster, RequestResource}
 import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor
-import io.gearpump.cluster.MasterToAppMaster.{ResourceAllocated, WorkerList, AppMasterRegistered}
-import io.gearpump.cluster.appmaster.{AppMasterRuntimeInfo, AppMasterRuntimeEnvironment}
-import io.gearpump.cluster.{AppMasterContext, UserConfig, AppDescription, TestUtil}
-import io.gearpump.cluster.scheduler.{ResourceAllocation, Relaxation, ResourceRequest, Resource}
-import DistServiceAppMaster.{FileContainer, GetFileContainer}
+import io.gearpump.cluster.MasterToAppMaster.{AppMasterRegistered, ResourceAllocated, WorkerList}
+import io.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, AppMasterRuntimeInfo}
+import io.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceAllocation, ResourceRequest}
+import io.gearpump.cluster.worker.WorkerId
+import io.gearpump.cluster.{AppDescription, AppMasterContext, TestUtil, UserConfig}
+import io.gearpump.experiments.distributeservice.DistServiceAppMaster.{FileContainer, GetFileContainer}
 import io.gearpump.util.ActorSystemBooter.RegisterActorSystem
 import io.gearpump.util.ActorUtil
-import org.scalatest.{BeforeAndAfter, Matchers, WordSpec}
-
-import scala.concurrent.duration._
 
-class DistServiceAppMasterSpec extends WordSpec with Matchers with BeforeAndAfter{
+class DistServiceAppMasterSpec extends WordSpec with Matchers with BeforeAndAfter {
   implicit val system = ActorSystem("AppMasterSpec", TestUtil.DEFAULT_CONFIG)
   val mockMaster = TestProbe()(system)
   val mockWorker1 = TestProbe()(system)
@@ -45,37 +47,41 @@ class DistServiceAppMasterSpec extends WordSpec with Matchers with BeforeAndAfte
   val workerList = List(WorkerId(1, 0L), WorkerId(2, 0L), WorkerId(3, 0L))
   val resource = Resource(1)
   val appJar = None
-  val appDescription = AppDescription("app0", classOf[DistServiceAppMaster].getName, UserConfig.empty)
+  val appDescription = AppDescription("app0", classOf[DistServiceAppMaster].getName,
+    UserConfig.empty)
 
   "DistService AppMaster" should {
     "responsable for service distributing" in {
       val appMasterInfo = AppMasterRuntimeInfo(appId, "appName", mockWorker1.ref)
-      val appMasterContext = AppMasterContext(appId, userName, resource, null, appJar, masterProxy, appMasterInfo)
+      val appMasterContext = AppMasterContext(appId, userName, resource, null, appJar, masterProxy,
+        appMasterInfo)
       TestActorRef[DistServiceAppMaster](
         AppMasterRuntimeEnvironment.props(List(masterProxy.path), appDescription, appMasterContext))
-      val registerAppMaster = mockMaster.receiveOne(15 seconds)
+      val registerAppMaster = mockMaster.receiveOne(15.seconds)
       assert(registerAppMaster.isInstanceOf[RegisterAppMaster])
 
       val appMaster = registerAppMaster.asInstanceOf[RegisterAppMaster].appMaster
       mockMaster.reply(AppMasterRegistered(appId))
-      //The DistributedShell AppMaster will ask for worker list
+      // The DistributedShell AppMaster will ask for worker list
       mockMaster.expectMsg(GetAllWorkers)
       mockMaster.reply(WorkerList(workerList))
-      //After worker list is ready, DistributedShell AppMaster will request resouce on each worker
+      // After worker list is ready, DistributedShell AppMaster will request resouce on each worker
       workerList.foreach { workerId =>
-        mockMaster.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), workerId, relaxation = Relaxation.SPECIFICWORKER)))
+        mockMaster.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), workerId,
+          relaxation = Relaxation.SPECIFICWORKER)))
       }
-      mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(resource, mockWorker1.ref, WorkerId(1, 0L)))))
+      mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(resource, mockWorker1.ref,
+        WorkerId(1, 0L)))))
       mockWorker1.expectMsgClass(classOf[LaunchExecutor])
       mockWorker1.reply(RegisterActorSystem(ActorUtil.getSystemAddress(system).toString))
 
       appMaster.tell(GetFileContainer, client.ref)
-      client.expectMsgClass(15 seconds, classOf[FileContainer])
+      client.expectMsgClass(15.seconds, classOf[FileContainer])
     }
   }
 
   after {
-    system.shutdown()
-    system.awaitTermination()
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankApplication.scala
----------------------------------------------------------------------
diff --git a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankApplication.scala b/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankApplication.scala
index 2f9928c..2e37091 100644
--- a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankApplication.scala
+++ b/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankApplication.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,11 +18,12 @@
 package io.gearpump.experiments.pagerank
 
 import akka.actor.ActorSystem
-import io.gearpump.streaming.{StreamApplication, Processor}
-import io.gearpump.streaming.appmaster.AppMaster
+
 import io.gearpump.cluster.{Application, ApplicationMaster, UserConfig}
-import PageRankApplication.NodeWithTaskId
+import io.gearpump.experiments.pagerank.PageRankApplication.NodeWithTaskId
 import io.gearpump.partitioner.HashPartitioner
+import io.gearpump.streaming.appmaster.AppMaster
+import io.gearpump.streaming.{Processor, StreamApplication}
 import io.gearpump.util.Graph
 import io.gearpump.util.Graph.Node
 
@@ -30,21 +31,19 @@ import io.gearpump.util.Graph.Node
  *
  * A simple and naive pagerank implementation.
  *
- * We will continue to optimize this to able to run page rank of tens of millions of nodes
- *
  * @param name name of the application
  * @param iteration max iteration count
  * @param delta decide the accuracy when the page rank example stops.
  * @param dag  the page rank graph
- * @tparam T
  */
-class PageRankApplication[T] (override val name : String,  iteration: Int, delta: Double, dag: Graph[T, _])
+class PageRankApplication[T](
+    override val name: String, iteration: Int, delta: Double, dag: Graph[T, _])
   extends Application {
 
   override def appMaster: Class[_ <: ApplicationMaster] = classOf[AppMaster]
   override def userConfig(implicit system: ActorSystem): UserConfig = {
 
-    // map node with taskId
+    // Map node with taskId
     var taskId = 0
     val pageRankDag = dag.mapVertex { node =>
       val updatedNode = NodeWithTaskId(taskId, node)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankController.scala
----------------------------------------------------------------------
diff --git a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankController.scala b/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankController.scala
index 8771a40..0fb689d 100644
--- a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankController.scala
+++ b/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankController.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,12 +18,14 @@
 package io.gearpump.experiments.pagerank
 
 import akka.actor.Actor.Receive
-import io.gearpump.streaming.task._
+
 import io.gearpump.cluster.UserConfig
-import PageRankController.Tick
-import PageRankWorker.LatestWeight
+import io.gearpump.experiments.pagerank.PageRankController.Tick
+import io.gearpump.experiments.pagerank.PageRankWorker.LatestWeight
+import io.gearpump.streaming.task._
 
-class PageRankController (taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
+class PageRankController(taskContext: TaskContext, conf: UserConfig)
+  extends Task(taskContext, conf) {
 
   val taskCount = conf.getInt(PageRankApplication.COUNT).get
   val iterationMax = conf.getInt(PageRankApplication.ITERATION).get
@@ -37,11 +39,11 @@ class PageRankController (taskContext : TaskContext, conf: UserConfig) extends T
   var weights = Map.empty[TaskId, Double]
   var deltas = Map.empty[TaskId, Double]
 
-  override def onStart(startTime : StartTime) : Unit = {
+  override def onStart(startTime: StartTime): Unit = {
     output(Tick(tick), tasks: _*)
   }
 
-  private def output(msg: AnyRef, tasks: TaskId *): Unit = {
+  private def output(msg: AnyRef, tasks: TaskId*): Unit = {
     taskContext.asInstanceOf[TaskWrapper].outputUnManaged(msg, tasks: _*)
   }
 
@@ -49,7 +51,7 @@ class PageRankController (taskContext : TaskContext, conf: UserConfig) extends T
     case LatestWeight(taskId, weight, replyTick) =>
       if (this.tick == replyTick) {
 
-        deltas +=  taskId -> Math.abs(weight - weights.getOrElse(taskId, 0.0))
+        deltas += taskId -> Math.abs(weight - weights.getOrElse(taskId, 0.0))
         weights += taskId -> weight
         receivedWeightForCurrentTick += 1
         if (receivedWeightForCurrentTick == taskCount) {
@@ -66,7 +68,7 @@ class PageRankController (taskContext : TaskContext, conf: UserConfig) extends T
   }
 
   private def continueIteration: Boolean = {
-    (tick < iterationMax) && deltas.values.foldLeft(false) {(deltaExceed, value) =>
+    (tick < iterationMax) && deltas.values.foldLeft(false) { (deltaExceed, value) =>
       deltaExceed || value > delta
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankWorker.scala
----------------------------------------------------------------------
diff --git a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankWorker.scala b/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankWorker.scala
index 87c30c2..9f38cd7 100644
--- a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankWorker.scala
+++ b/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankWorker.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,17 +18,17 @@
 package io.gearpump.experiments.pagerank
 
 import akka.actor.Actor.Receive
-import io.gearpump.streaming.task.{Task, TaskContext, TaskId, TaskWrapper}
+
 import io.gearpump.cluster.UserConfig
-import PageRankApplication.NodeWithTaskId
-import PageRankController.Tick
-import PageRankWorker.{LatestWeight, UpdateWeight}
+import io.gearpump.experiments.pagerank.PageRankApplication.NodeWithTaskId
+import io.gearpump.experiments.pagerank.PageRankController.Tick
+import io.gearpump.experiments.pagerank.PageRankWorker.{LatestWeight, UpdateWeight}
+import io.gearpump.streaming.task.{Task, TaskContext, TaskId, TaskWrapper}
 import io.gearpump.util.Graph
 
-class PageRankWorker(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
+class PageRankWorker(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
   import taskContext.taskId
 
-
   private var weight: Double = 1.0
   private var upstreamWeights = Map.empty[TaskId, Double]
 
@@ -41,14 +41,15 @@ class PageRankWorker(taskContext : TaskContext, conf: UserConfig) extends Task(t
     node.taskId == taskContext.taskId.index
   }.get
 
-  private val downstream = graph.outgoingEdgesOf(node).map(_._3).map(id => taskId.copy(index = id.taskId)).toSeq
+  private val downstream = graph.outgoingEdgesOf(node).map(_._3)
+    .map(id => taskId.copy(index = id.taskId)).toSeq
   private val upstreamCount = graph.incomingEdgesOf(node).map(_._1).length
 
-  LOG.info(s"downstream nodes: $downstream" )
+  LOG.info(s"downstream nodes: $downstream")
 
   private var tick = 0
 
-  private def output(msg: AnyRef, tasks: TaskId *): Unit = {
+  private def output(msg: AnyRef, tasks: TaskId*): Unit = {
     taskContext.asInstanceOf[TaskWrapper].outputUnManaged(msg, tasks: _*)
   }
 
@@ -57,7 +58,7 @@ class PageRankWorker(taskContext : TaskContext, conf: UserConfig) extends Task(t
       this.tick = tick
 
       if (downstream.length == 0) {
-        // if there is no downstream, we will evenly distribute our page rank to
+        // If there is no downstream, we will evenly distribute our page rank to
         // every node in the graph
         val update = UpdateWeight(taskId, weight / taskCount)
         output(update, allTasks: _*)
@@ -65,10 +66,10 @@ class PageRankWorker(taskContext : TaskContext, conf: UserConfig) extends Task(t
         val update = UpdateWeight(taskId, weight / downstream.length)
         output(update, downstream: _*)
       }
-    case update@ UpdateWeight(upstreamTaskId, weight) =>
+    case update@UpdateWeight(upstreamTaskId, weight) =>
       upstreamWeights += upstreamTaskId -> weight
       if (upstreamWeights.size == upstreamCount) {
-        val nextWeight = upstreamWeights.foldLeft(0.0) {(sum, item) => sum + item._2}
+        val nextWeight = upstreamWeights.foldLeft(0.0) { (sum, item) => sum + item._2 }
         this.upstreamWeights = Map.empty[TaskId, Double]
         this.weight = nextWeight
         output(LatestWeight(taskId, weight, tick), TaskId(0, 0))
@@ -76,7 +77,7 @@ class PageRankWorker(taskContext : TaskContext, conf: UserConfig) extends Task(t
   }
 }
 
-object PageRankWorker{
+object PageRankWorker {
   case class UpdateWeight(taskId: TaskId, weight: Double)
   case class LatestWeight(taskId: TaskId, weight: Double, tick: Int)
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/example/PageRankExample.scala
----------------------------------------------------------------------
diff --git a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/example/PageRankExample.scala b/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/example/PageRankExample.scala
index 2fb9b81..3877974 100644
--- a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/example/PageRankExample.scala
+++ b/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/example/PageRankExample.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,11 +17,12 @@
  */
 package io.gearpump.experiments.pagerank.example
 
-import io.gearpump.experiments.pagerank.PageRankApplication
 import io.gearpump.cluster.client.ClientContext
-import io.gearpump.util.{AkkaApp, Graph}
+import io.gearpump.experiments.pagerank.PageRankApplication
 import io.gearpump.util.Graph.Node
+import io.gearpump.util.{AkkaApp, Graph}
 
+/** A very simple PageRank example, Cyclic graph is not supported */
 object PageRankExample extends AkkaApp {
 
   val a = "a"
@@ -29,7 +30,7 @@ object PageRankExample extends AkkaApp {
   val c = "c"
   val d = "d"
 
-  def help: Unit = Unit
+  def help(): Unit = Unit
 
   def main(akkaConf: Config, args: Array[String]): Unit = {
     val pageRankGraph = Graph(a ~> b, a ~> c, a ~> d, b ~> a, b ~> d, d ~> b, d ~> c, c ~> b)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Dag.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Dag.scala b/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Dag.scala
index a7e1f92..b517126 100644
--- a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Dag.scala
+++ b/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Dag.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,57 +18,58 @@
 
 package io.gearpump.streaming.examples.complexdag
 
-import io.gearpump.streaming.{StreamApplication, Processor}
-import io.gearpump.streaming.task.TaskContext
+import org.slf4j.Logger
+
 import io.gearpump.cluster.UserConfig
 import io.gearpump.cluster.client.ClientContext
 import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
 import io.gearpump.partitioner.HashPartitioner
+import io.gearpump.streaming.task.TaskContext
+import io.gearpump.streaming.{Processor, StreamApplication}
 import io.gearpump.util.Graph.{Node => GraphNode}
 import io.gearpump.util.{AkkaApp, Graph, LogUtil}
-import org.slf4j.Logger
 
-/*
- digraph flow {
-     Source_0 -> Sink_0;
-     Source_0 -> Sink_1;
-     Source_0 -> Sink_2;
-     Source_0 -> Node_1;
-     Source_1 -> Node_0;
-     Node_0 -> Sink_3;
-     Node_1 -> Sink_3;
-     Node_1 -> Sink_4;
-     Node_1 -> Node_4;
-     Node_2 -> Node_3;
-     Node_1 -> Node_3;
-     Source_0 -> Node_2;
-     Source_0 -> Node_3;
-     Node_3 -> Sink_3;
-     Node_4 -> Sink_3;
-     Source_1 -> Sink_4;
- }
-*/
-case class Source_0(_context : TaskContext, _conf: UserConfig) extends Source(_context, _conf)
-case class Source_1(_context : TaskContext, _conf: UserConfig) extends Source(_context, _conf)
-case class Node_0(_context : TaskContext, _conf: UserConfig) extends Node(_context, _conf)
-case class Node_1(_context : TaskContext, _conf: UserConfig) extends Node(_context, _conf)
-case class Node_2(_context : TaskContext, _conf: UserConfig) extends Node(_context, _conf)
-case class Node_3(_context : TaskContext, _conf: UserConfig) extends Node(_context, _conf)
-case class Node_4(_context : TaskContext, _conf: UserConfig) extends Node(_context, _conf)
-case class Sink_0(_context : TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
-case class Sink_1(_context : TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
-case class Sink_2(_context : TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
-case class Sink_3(_context : TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
-case class Sink_4(_context : TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
+case class Source_0(_context: TaskContext, _conf: UserConfig) extends Source(_context, _conf)
+case class Source_1(_context: TaskContext, _conf: UserConfig) extends Source(_context, _conf)
+case class Node_0(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf)
+case class Node_1(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf)
+case class Node_2(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf)
+case class Node_3(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf)
+case class Node_4(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf)
+case class Sink_0(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
+case class Sink_1(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
+case class Sink_2(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
+case class Sink_3(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
+case class Sink_4(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
 
+/**
+ * digraph flow {
+ *   Source_0 -> Sink_0;
+ *   Source_0 -> Sink_1;
+ *   Source_0 -> Sink_2;
+ *   Source_0 -> Node_1;
+ *   Source_1 -> Node_0;
+ *   Node_0 -> Sink_3;
+ *   Node_1 -> Sink_3;
+ *   Node_1 -> Sink_4;
+ *   Node_1 -> Node_4;
+ *   Node_2 -> Node_3;
+ *   Node_1 -> Node_3;
+ *   Source_0 -> Node_2;
+ *   Source_0 -> Node_3;
+ *   Node_3 -> Sink_3;
+ *   Node_4 -> Sink_3;
+ *   Source_1 -> Sink_4;
+ * }
+ */
 object Dag extends AkkaApp with ArgumentsParser {
   private val LOG: Logger = LogUtil.getLogger(getClass)
   val RUN_FOR_EVER = -1
 
   override val options: Array[(String, CLIOption[Any])] = Array.empty
 
-  def application(config: ParseResult) : StreamApplication = {
-    
+  def application(config: ParseResult): StreamApplication = {
+
     val source_0 = Processor[Source_0](1)
     val source_1 = Processor[Source_1](1)
     val node_0 = Processor[Node_0](1)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Node.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Node.scala b/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Node.scala
index e8837ed..dfad6c8 100644
--- a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Node.scala
+++ b/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Node.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,16 +18,16 @@
 
 package io.gearpump.streaming.examples.complexdag
 
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
 import io.gearpump.Message
 import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
 
-class Node (taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
+class Node(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
   import taskContext.output
 
-  override def onStart(startTime : StartTime) : Unit = {}
+  override def onStart(startTime: StartTime): Unit = {}
 
-  override def onNext(msg : Message) : Unit = {
+  override def onNext(msg: Message): Unit = {
     val list = msg.msg.asInstanceOf[Vector[String]]
     output(new Message(list :+ getClass.getCanonicalName, System.currentTimeMillis()))
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Sink.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Sink.scala b/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Sink.scala
index adc3115..b091a17 100644
--- a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Sink.scala
+++ b/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Sink.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,11 +18,11 @@
 
 package io.gearpump.streaming.examples.complexdag
 
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import scala.collection.mutable
+
 import io.gearpump.Message
 import io.gearpump.cluster.UserConfig
-
-import scala.collection.mutable
+import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
 
 class Sink(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
 
@@ -42,5 +42,4 @@ class Sink(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext,
       case _ =>
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Source.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Source.scala b/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Source.scala
index 80cd8d2..df656ac 100644
--- a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Source.scala
+++ b/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Source.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,12 +18,12 @@
 
 package io.gearpump.streaming.examples.complexdag
 
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
 import io.gearpump.Message
 import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
 
 class Source(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
-  import taskContext.{output, self}
+  import taskContext.output
 
   override def onStart(startTime: StartTime): Unit = {
     self ! Message("start")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/DagSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/DagSpec.scala b/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/DagSpec.scala
index 02a1017..b142d8d 100644
--- a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/DagSpec.scala
+++ b/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/DagSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,16 +18,18 @@
 
 package io.gearpump.streaming.examples.complexdag
 
-import io.gearpump.cluster.ClientToMaster.SubmitApplication
-import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
-import io.gearpump.cluster.{TestUtil, MasterHarness}
-import io.gearpump.util.Util
+import scala.concurrent.Future
+import scala.util.Success
+
 import org.scalatest._
 import org.scalatest.prop.PropertyChecks
-import scala.util.Success
-import scala.concurrent.Future
 
-class DagSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll with MasterHarness {
+import io.gearpump.cluster.ClientToMaster.SubmitApplication
+import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
+import io.gearpump.cluster.{MasterHarness, TestUtil}
+
+class DagSpec extends PropSpec with PropertyChecks
+  with Matchers with BeforeAndAfterAll with MasterHarness {
 
   override def beforeAll {
     startActorSystem()
@@ -37,7 +39,7 @@ class DagSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndA
     shutdownActorSystem()
   }
 
-  override def config = TestUtil.DEFAULT_CONFIG
+  protected override def config = TestUtil.DEFAULT_CONFIG
 
   property("Dag should succeed to submit application with required arguments") {
     val requiredArgs = Array.empty[String]
@@ -45,7 +47,9 @@ class DagSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndA
     val masterReceiver = createMockMaster()
     val args = requiredArgs
 
-    Future{Dag.main(masterConfig, args)}
+    Future {
+      Dag.main(masterConfig, args)
+    }
     masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
     masterReceiver.reply(SubmitApplicationResult(Success(0)))
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/NodeSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/NodeSpec.scala b/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/NodeSpec.scala
index d151651..35c5824 100644
--- a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/NodeSpec.scala
+++ b/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/NodeSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,25 +17,25 @@
  */
 package io.gearpump.streaming.examples.complexdag
 
-import io.gearpump.streaming.MockUtil
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import MockUtil._
-import org.mockito.ArgumentMatcher
-import org.mockito.Matchers._
 import org.mockito.Mockito._
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
 
+import io.gearpump.Message
+import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.MockUtil
+import io.gearpump.streaming.MockUtil._
+
 class NodeSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter {
 
   val context = MockUtil.mockTaskContext
 
   val node = new Node(context, UserConfig.empty)
 
-  property("Node should send a Vector[String](classOf[Node].getCanonicalName, classOf[Node].getCanonicalName"){
+  property("Node should send a Vector[String](classOf[Node].getCanonicalName, " +
+    "classOf[Node].getCanonicalName") {
     val list = Vector(classOf[Node].getCanonicalName)
-    val expected = Vector(classOf[Node].getCanonicalName,classOf[Node].getCanonicalName)
+    val expected = Vector(classOf[Node].getCanonicalName, classOf[Node].getCanonicalName)
     node.onNext(Message(list))
     verify(context).output(argMatch[Message](_.msg == expected))
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SinkSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SinkSpec.scala b/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SinkSpec.scala
index b26f639..341f6c6 100644
--- a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SinkSpec.scala
+++ b/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SinkSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,19 +17,21 @@
  */
 package io.gearpump.streaming.examples.complexdag
 
-import io.gearpump.streaming.MockUtil
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
 
+import io.gearpump.Message
+import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.MockUtil
+
 class SinkSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter {
 
   val context = MockUtil.mockTaskContext
 
   val sink = new Sink(context, UserConfig.empty)
 
-  property("Sink should send a Vector[String](classOf[Sink].getCanonicalName, classOf[Sink].getCanonicalName"){
+  property("Sink should send a Vector[String](classOf[Sink].getCanonicalName, " +
+    "classOf[Sink].getCanonicalName") {
     val list = Vector(classOf[Sink].getCanonicalName)
     val expected = Vector(classOf[Sink].getCanonicalName, classOf[Sink].getCanonicalName)
     sink.onNext(Message(list))

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SourceSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SourceSpec.scala b/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SourceSpec.scala
index 21ae6ab..faa7aa7 100644
--- a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SourceSpec.scala
+++ b/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SourceSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,15 +18,14 @@
 package io.gearpump.streaming.examples.complexdag
 
 import akka.actor.ActorSystem
-import io.gearpump.streaming.MockUtil
-import io.gearpump.Message
-import io.gearpump.cluster.{TestUtil, UserConfig}
-import MockUtil._
-import org.mockito.ArgumentMatcher
-import org.mockito.Matchers._
 import org.mockito.Mockito._
 import org.scalatest.{Matchers, WordSpec}
 
+import io.gearpump.Message
+import io.gearpump.cluster.{TestUtil, UserConfig}
+import io.gearpump.streaming.MockUtil
+import io.gearpump.streaming.MockUtil._
+
 class SourceSpec extends WordSpec with Matchers {
 
   "Source" should {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/HadoopConfig.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/HadoopConfig.scala b/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/HadoopConfig.scala
index c8c1135..3b53c9c 100644
--- a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/HadoopConfig.scala
+++ b/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/HadoopConfig.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,19 +18,22 @@
 package io.gearpump.streaming.examples.fsio
 
 import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}
+import scala.language.implicitConversions
+
+import org.apache.hadoop.conf.Configuration
 
 import io.gearpump.cluster.UserConfig
 import io.gearpump.util.Constants._
-import org.apache.hadoop.conf.Configuration
 
-import scala.language.implicitConversions
+class HadoopConfig(config: UserConfig) {
 
-class HadoopConfig(config: UserConfig)  {
+  def withHadoopConf(conf: Configuration): UserConfig = {
+    config.withBytes(HADOOP_CONF, serializeHadoopConf(conf))
+  }
 
-  def withHadoopConf(conf : Configuration) : UserConfig = config.withBytes(HADOOP_CONF, serializeHadoopConf(conf))
-  def hadoopConf : Configuration = deserializeHadoopConf(config.getBytes(HADOOP_CONF).get)
+  def hadoopConf: Configuration = deserializeHadoopConf(config.getBytes(HADOOP_CONF).get)
 
-  private def serializeHadoopConf(conf: Configuration) : Array[Byte] = {
+  private def serializeHadoopConf(conf: Configuration): Array[Byte] = {
     val out = new ByteArrayOutputStream()
     val dataOut = new DataOutputStream(out)
     conf.write(dataOut)
@@ -38,10 +41,10 @@ class HadoopConfig(config: UserConfig)  {
     out.toByteArray
   }
 
-  private def deserializeHadoopConf(bytes: Array[Byte]) : Configuration = {
+  private def deserializeHadoopConf(bytes: Array[Byte]): Configuration = {
     val in = new ByteArrayInputStream(bytes)
     val dataIn = new DataInputStream(in)
-    val result= new Configuration()
+    val result = new Configuration()
     result.readFields(dataIn)
     dataIn.close()
     result
@@ -49,8 +52,8 @@ class HadoopConfig(config: UserConfig)  {
 }
 
 object HadoopConfig {
-  def empty = new HadoopConfig(UserConfig.empty)
-  def apply(config: UserConfig) = new HadoopConfig(config)
+  def empty: HadoopConfig = new HadoopConfig(UserConfig.empty)
+  def apply(config: UserConfig): HadoopConfig = new HadoopConfig(config)
 
   implicit def userConfigToHadoopConfig(userConf: UserConfig): HadoopConfig = {
     HadoopConfig(userConf)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala b/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
index 808f3b1..13fc3f9 100644
--- a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
+++ b/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,20 +19,21 @@ package io.gearpump.streaming.examples.fsio
 
 import java.io.File
 import java.util.concurrent.TimeUnit
+import scala.concurrent.duration.FiniteDuration
 
 import akka.actor.Cancellable
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import SeqFileStreamProcessor._
-import HadoopConfig._
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.io.SequenceFile._
 import org.apache.hadoop.io.{SequenceFile, Text}
 
-import scala.concurrent.duration.FiniteDuration
+import io.gearpump.Message
+import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.examples.fsio.HadoopConfig._
+import io.gearpump.streaming.examples.fsio.SeqFileStreamProcessor._
+import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
 
-class SeqFileStreamProcessor(taskContext : TaskContext, config: UserConfig) extends Task(taskContext, config){
+class SeqFileStreamProcessor(taskContext: TaskContext, config: UserConfig)
+  extends Task(taskContext, config) {
 
   import taskContext.taskId
 
@@ -43,16 +44,17 @@ class SeqFileStreamProcessor(taskContext : TaskContext, config: UserConfig) exte
   val value = new Text()
   val hadoopConf = config.hadoopConf
 
-  private var msgCount : Long = 0
-  private var snapShotKVCount : Long = 0
-  private var snapShotTime : Long = 0
+  private var msgCount: Long = 0
+  private var snapShotKVCount: Long = 0
+  private var snapShotTime: Long = 0
   private var scheduler: Cancellable = null
 
-  override def onStart(startTime : StartTime) = {
+  override def onStart(startTime: StartTime): Unit = {
 
     val fs = FileSystem.get(hadoopConf)
     fs.deleteOnExit(outputPath)
-    writer = SequenceFile.createWriter(hadoopConf, Writer.file(outputPath), Writer.keyClass(textClass), Writer.valueClass(textClass))
+    writer = SequenceFile.createWriter(hadoopConf, Writer.file(outputPath),
+      Writer.keyClass(textClass), Writer.valueClass(textClass))
 
     scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS),
       new FiniteDuration(5, TimeUnit.SECONDS))(reportStatus())
@@ -62,7 +64,7 @@ class SeqFileStreamProcessor(taskContext : TaskContext, config: UserConfig) exte
 
   override def onNext(msg: Message): Unit = {
     val kv = msg.msg.asInstanceOf[String].split("\\+\\+")
-    if(kv.length >= 2) {
+    if (kv.length >= 2) {
       key.set(kv(0))
       value.set(kv(1))
       writer.append(key, value)
@@ -70,7 +72,7 @@ class SeqFileStreamProcessor(taskContext : TaskContext, config: UserConfig) exte
     msgCount += 1
   }
 
-  override def onStop(): Unit ={
+  override def onStop(): Unit = {
     if (scheduler != null) {
       scheduler.cancel()
     }
@@ -78,14 +80,17 @@ class SeqFileStreamProcessor(taskContext : TaskContext, config: UserConfig) exte
     LOG.info("sequence file bolt stopped")
   }
 
-  def reportStatus() = {
-    val current : Long = System.currentTimeMillis()
-    LOG.info(s"Task $taskId Throughput: ${(msgCount - snapShotKVCount, (current - snapShotTime) / 1000)} (KVPairs, second)")
+  private def reportStatus() = {
+    val current: Long = System.currentTimeMillis()
+    LOG.info(s"Task $taskId Throughput: ${
+      (msgCount - snapShotKVCount,
+        (current - snapShotTime) / 1000)
+    } (KVPairs, second)")
     snapShotKVCount = msgCount
     snapShotTime = current
   }
 }
 
-object SeqFileStreamProcessor{
+object SeqFileStreamProcessor {
   val OUTPUT_PATH = "outputpath"
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala b/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
index 5d32f74..f9b0d22 100644
--- a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
+++ b/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,18 +17,20 @@
  */
 package io.gearpump.streaming.examples.fsio
 
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import SeqFileStreamProducer._
-import HadoopConfig._
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.io.SequenceFile._
 import org.apache.hadoop.io.{SequenceFile, Text}
 
-class SeqFileStreamProducer(taskContext : TaskContext, config: UserConfig) extends Task(taskContext, config){
+import io.gearpump.Message
+import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.examples.fsio.HadoopConfig._
+import io.gearpump.streaming.examples.fsio.SeqFileStreamProducer._
+import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
+
+class SeqFileStreamProducer(taskContext: TaskContext, config: UserConfig)
+  extends Task(taskContext, config) {
 
-  import taskContext.{output, self}
+  import taskContext.output
 
   val value = new Text()
   val key = new Text()
@@ -37,14 +39,14 @@ class SeqFileStreamProducer(taskContext : TaskContext, config: UserConfig) exten
   val fs = FileSystem.get(hadoopConf)
   val inputPath = new Path(config.getString(INPUT_PATH).get)
 
-  override def onStart(startTime : StartTime) = {
+  override def onStart(startTime: StartTime): Unit = {
     reader = new SequenceFile.Reader(hadoopConf, Reader.file(inputPath))
     self ! Start
     LOG.info("sequence file spout initiated")
   }
 
-  override def onNext(msg: Message) = {
-    if(reader.next(key, value)){
+  override def onNext(msg: Message): Unit = {
+    if (reader.next(key, value)) {
       output(Message(key + "++" + value))
     } else {
       reader.close()
@@ -53,13 +55,13 @@ class SeqFileStreamProducer(taskContext : TaskContext, config: UserConfig) exten
     self ! Continue
   }
 
-  override def onStop(): Unit ={
+  override def onStop(): Unit = {
     reader.close()
   }
 }
 
-object SeqFileStreamProducer{
-  def INPUT_PATH = "inputpath"
+object SeqFileStreamProducer {
+  def INPUT_PATH: String = "inputpath"
 
   val Start = Message("start")
   val Continue = Message("continue")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SequenceFileIO.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SequenceFileIO.scala b/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SequenceFileIO.scala
index c752bba..7272f2b 100644
--- a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SequenceFileIO.scala
+++ b/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SequenceFileIO.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,39 +17,44 @@
  */
 package io.gearpump.streaming.examples.fsio
 
-import io.gearpump.streaming.{StreamApplication, Processor}
+import org.apache.hadoop.conf.Configuration
+import org.slf4j.Logger
+
 import io.gearpump.cluster.UserConfig
 import io.gearpump.cluster.client.ClientContext
 import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
 import io.gearpump.partitioner.ShufflePartitioner
+import io.gearpump.streaming.examples.fsio.HadoopConfig._
+import io.gearpump.streaming.{Processor, StreamApplication}
 import io.gearpump.util.Graph._
-import HadoopConfig._
 import io.gearpump.util.{AkkaApp, Graph, LogUtil}
-import org.apache.hadoop.conf.Configuration
-import org.slf4j.Logger
 
 object SequenceFileIO extends AkkaApp with ArgumentsParser {
   private val LOG: Logger = LogUtil.getLogger(getClass)
 
   override val options: Array[(String, CLIOption[Any])] = Array(
-    "source"-> CLIOption[Int]("<sequence file reader number>", required = false, defaultValue = Some(1)),
-    "sink"-> CLIOption[Int]("<sequence file writer number>", required = false, defaultValue = Some(1)),
-    "input"-> CLIOption[String]("<input file path>", required = true),
-    "output"-> CLIOption[String]("<output file directory>", required = true)
+    "source" -> CLIOption[Int]("<sequence file reader number>", required = false,
+      defaultValue = Some(1)),
+    "sink" -> CLIOption[Int]("<sequence file writer number>", required = false,
+      defaultValue = Some(1)),
+    "input" -> CLIOption[String]("<input file path>", required = true),
+    "output" -> CLIOption[String]("<output file directory>", required = true)
   )
 
-  def application(config: ParseResult) : StreamApplication = {
+  def application(config: ParseResult): StreamApplication = {
     val spoutNum = config.getInt("source")
     val boltNum = config.getInt("sink")
     val input = config.getString("input")
     val output = config.getString("output")
-    val appConfig = UserConfig.empty.withString(SeqFileStreamProducer.INPUT_PATH, input).withString(SeqFileStreamProcessor.OUTPUT_PATH, output)
+    val appConfig = UserConfig.empty.withString(SeqFileStreamProducer.INPUT_PATH, input)
+      .withString(SeqFileStreamProcessor.OUTPUT_PATH, output)
     val hadoopConfig = appConfig.withHadoopConf(new Configuration())
     val partitioner = new ShufflePartitioner()
     val streamProducer = Processor[SeqFileStreamProducer](spoutNum)
     val streamProcessor = Processor[SeqFileStreamProcessor](boltNum)
 
-    val app = StreamApplication("SequenceFileIO", Graph(streamProducer ~ partitioner ~> streamProcessor), hadoopConfig)
+    val app = StreamApplication("SequenceFileIO",
+      Graph(streamProducer ~ partitioner ~> streamProcessor), hadoopConfig)
     app
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/HadoopConfigSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/HadoopConfigSpec.scala b/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/HadoopConfigSpec.scala
index 64ba697..e5dbe0b 100644
--- a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/HadoopConfigSpec.scala
+++ b/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/HadoopConfigSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,10 +17,11 @@
  */
 package io.gearpump.streaming.examples.fsio
 
-import io.gearpump.cluster.UserConfig
 import org.apache.hadoop.conf.Configuration
 import org.scalatest.{Matchers, WordSpec}
 
+import io.gearpump.cluster.UserConfig
+
 class HadoopConfigSpec extends WordSpec with Matchers {
 
   "HadoopConfig" should {
@@ -32,7 +33,7 @@ class HadoopConfigSpec extends WordSpec with Matchers {
 
       val user = UserConfig.empty
 
-      import HadoopConfig._
+      import io.gearpump.streaming.examples.fsio.HadoopConfig._
       assert(user.withHadoopConf(hadoopConf).hadoopConf.get(key) == value)
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala b/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala
index 99c7113..bb0d26b 100644
--- a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala
+++ b/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,15 +18,10 @@
 package io.gearpump.streaming.examples.fsio
 
 import java.io.File
+import scala.collection.mutable.ArrayBuffer
 
 import akka.actor.ActorSystem
 import akka.testkit.TestProbe
-import io.gearpump.streaming.{Processor, MockUtil}
-import io.gearpump.streaming.task.{StartTime, TaskId}
-import io.gearpump.Message
-import io.gearpump.cluster.{TestUtil, UserConfig}
-import io.gearpump.streaming.Processor
-import io.gearpump.streaming.task.StartTime
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.io.SequenceFile.Reader
@@ -36,8 +31,13 @@ import org.scalacheck.Gen
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
 
-import scala.collection.mutable.ArrayBuffer
-class SeqFileStreamProcessorSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter {
+import io.gearpump.Message
+import io.gearpump.cluster.{TestUtil, UserConfig}
+import io.gearpump.streaming.task.{StartTime, TaskId}
+import io.gearpump.streaming.{MockUtil, Processor}
+class SeqFileStreamProcessorSpec
+  extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter {
+
   val kvPairs = new ArrayBuffer[(String, String)]
   val outputDirectory = "SeqFileStreamProcessor_Test"
   val sequenceFilePath = new Path(outputDirectory + File.separator + TaskId(0, 0))
@@ -56,7 +56,8 @@ class SeqFileStreamProcessorSpec extends PropSpec with PropertyChecks with Match
     implicit val system1 = ActorSystem("SeqFileStreamProcessor", TestUtil.DEFAULT_CONFIG)
     val system2 = ActorSystem("Reporter", TestUtil.DEFAULT_CONFIG)
     val watcher = TestProbe()(system1)
-    val conf = HadoopConfig(UserConfig.empty.withString(SeqFileStreamProcessor.OUTPUT_PATH, outputDirectory)).withHadoopConf(new Configuration())
+    val conf = HadoopConfig(UserConfig.empty.withString(SeqFileStreamProcessor.OUTPUT_PATH,
+      outputDirectory)).withHadoopConf(new Configuration())
     val context = MockUtil.mockTaskContext
 
     val processorDescription =
@@ -80,7 +81,7 @@ class SeqFileStreamProcessorSpec extends PropSpec with PropertyChecks with Match
     val reader = new SequenceFile.Reader(hadoopConf, Reader.file(sequenceFilePath))
     kvPairs.foreach { kv =>
       val (key, value) = kv
-      if(value.length > 0 && reader.next(_key, _value)) {
+      if (value.length > 0 && reader.next(_key, _value)) {
         assert(_key.toString == key && _value.toString == value)
       }
     }
@@ -90,4 +91,4 @@ class SeqFileStreamProcessorSpec extends PropSpec with PropertyChecks with Match
   after {
     fs.deleteOnExit(new Path(outputDirectory))
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala b/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
index cb6f553..04dafa7 100644
--- a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
+++ b/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,26 +17,26 @@
  */
 package io.gearpump.streaming.examples.fsio
 
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.examples.fsio.HadoopConfig
-import io.gearpump.streaming.task.StartTime
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import MockUtil._
+import scala.collection.mutable.ArrayBuffer
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.io.SequenceFile.Writer
 import org.apache.hadoop.io.{SequenceFile, Text}
-import org.mockito.ArgumentMatcher
-import org.mockito.Matchers._
 import org.mockito.Mockito._
 import org.scalacheck.Gen
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
 
-import scala.collection.mutable.ArrayBuffer
+import io.gearpump.Message
+import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.MockUtil
+import io.gearpump.streaming.MockUtil._
+import io.gearpump.streaming.task.StartTime
+
+class SeqFileStreamProducerSpec
+  extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter {
 
-class SeqFileStreamProducerSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter{
   val kvPairs = new ArrayBuffer[(String, String)]
   val inputFile = "SeqFileStreamProducer_Test"
   val sequenceFilePath = new Path(inputFile)
@@ -53,7 +53,8 @@ class SeqFileStreamProducerSpec extends PropSpec with PropertyChecks with Matche
 
   before {
     fs.deleteOnExit(sequenceFilePath)
-    val writer = SequenceFile.createWriter(hadoopConf, Writer.file(sequenceFilePath), Writer.keyClass(textClass), Writer.valueClass(textClass))
+    val writer = SequenceFile.createWriter(hadoopConf, Writer.file(sequenceFilePath),
+      Writer.keyClass(textClass), Writer.valueClass(textClass))
     forAll(kvGenerator) { kv =>
       _key.set(kv._1)
       _value.set(kv._2)
@@ -63,9 +64,11 @@ class SeqFileStreamProducerSpec extends PropSpec with PropertyChecks with Matche
     writer.close()
   }
 
-  property("SeqFileStreamProducer should read the key-value pairs from a sequence file and deliver them") {
+  property("SeqFileStreamProducer should read the key-value pairs from " +
+    "a sequence file and deliver them") {
 
-    val conf = HadoopConfig(UserConfig.empty.withString(SeqFileStreamProducer.INPUT_PATH, inputFile)).withHadoopConf(new Configuration())
+    val conf = HadoopConfig(UserConfig.empty.withString(SeqFileStreamProducer.INPUT_PATH,
+      inputFile)).withHadoopConf(new Configuration())
 
     val context = MockUtil.mockTaskContext
 
@@ -74,7 +77,8 @@ class SeqFileStreamProducerSpec extends PropSpec with PropertyChecks with Matche
     producer.onNext(Message("start"))
 
     val expected = kvPairs.map(kv => kv._1 + "++" + kv._2).toSet
-    verify(context).output(argMatch[Message](msg => expected.contains(msg.msg.asInstanceOf[String])))
+    verify(context).output(argMatch[Message](msg =>
+      expected.contains(msg.msg.asInstanceOf[String])))
   }
 
   after {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SequenceFileIOSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SequenceFileIOSpec.scala b/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SequenceFileIOSpec.scala
index f10227d..efb5e44 100644
--- a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SequenceFileIOSpec.scala
+++ b/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SequenceFileIOSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,17 +18,20 @@
 
 package io.gearpump.streaming.examples.fsio
 
+import scala.concurrent.Future
+import scala.util.{Success, Try}
+
+import com.typesafe.config.Config
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{BeforeAndAfterAll, Matchers, PropSpec}
+
 import io.gearpump.cluster.ClientToMaster.SubmitApplication
 import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
 import io.gearpump.cluster.{MasterHarness, TestUtil}
-import io.gearpump.util.Util
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{BeforeAndAfterAll, Matchers, PropSpec}
 
-import scala.util.{Success, Try}
-import scala.concurrent.Future
+class SequenceFileIOSpec
+  extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll with MasterHarness {
 
-class SequenceFileIOSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll with MasterHarness {
   override def beforeAll {
     startActorSystem()
   }
@@ -37,7 +40,7 @@ class SequenceFileIOSpec extends PropSpec with PropertyChecks with Matchers with
     shutdownActorSystem()
   }
 
-  override def config = TestUtil.DEFAULT_CONFIG
+  override def config: Config = TestUtil.DEFAULT_CONFIG
 
   property("SequenceFileIO should succeed to submit application with required arguments") {
     val requiredArgs = Array(
@@ -58,7 +61,9 @@ class SequenceFileIOSpec extends PropSpec with PropertyChecks with Matchers with
     forAll(validArgs) { (requiredArgs: Array[String], optionalArgs: Array[String]) =>
       val args = requiredArgs ++ optionalArgs
 
-      Future {SequenceFileIO.main(masterConfig, args)}
+      Future {
+        SequenceFileIO.main(masterConfig, args)
+      }
       masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
       masterReceiver.reply(SubmitApplicationResult(Success(0)))
     }
@@ -75,5 +80,4 @@ class SequenceFileIOSpec extends PropSpec with PropertyChecks with Matchers with
       assert(Try(SequenceFileIO.main(args)).isFailure, "missing required arguments, print usage")
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/KafkaReadWrite.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/KafkaReadWrite.scala b/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/KafkaReadWrite.scala
index b4d9a04..35b6594 100644
--- a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/KafkaReadWrite.scala
+++ b/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/KafkaReadWrite.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,6 +19,8 @@
 package io.gearpump.streaming.examples.kafka
 
 import akka.actor.ActorSystem
+import org.slf4j.Logger
+
 import io.gearpump.cluster.UserConfig
 import io.gearpump.cluster.client.ClientContext
 import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
@@ -29,21 +31,26 @@ import io.gearpump.streaming.sink.DataSinkProcessor
 import io.gearpump.streaming.source.DataSourceProcessor
 import io.gearpump.util.Graph._
 import io.gearpump.util.{AkkaApp, Graph, LogUtil}
-import org.slf4j.Logger
 
 object KafkaReadWrite extends AkkaApp with ArgumentsParser {
   private val LOG: Logger = LogUtil.getLogger(getClass)
 
   override val options: Array[(String, CLIOption[Any])] = Array(
-    "source" -> CLIOption[Int]("<hom many kafka producer tasks>", required = false, defaultValue = Some(1)),
-    "sink" -> CLIOption[Int]("<hom many kafka processor tasks>", required = false, defaultValue = Some(1)),
-    "zookeeperConnect" -> CLIOption[String]("<zookeeper connect string>", required = false, defaultValue = Some("localhost:2181")),
-    "brokerList" -> CLIOption[String]("<broker server list string>", required = false, defaultValue = Some("localhost:9092")),
-    "sourceTopic" -> CLIOption[String]("<kafka source topic>", required = false, defaultValue = Some("topic1")),
-    "sinkTopic" -> CLIOption[String]("<kafka sink topic>", required = false, defaultValue = Some("topic2"))
+    "source" -> CLIOption[Int]("<hom many kafka producer tasks>", required = false,
+      defaultValue = Some(1)),
+    "sink" -> CLIOption[Int]("<hom many kafka processor tasks>", required = false,
+      defaultValue = Some(1)),
+    "zookeeperConnect" -> CLIOption[String]("<zookeeper connect string>", required = false,
+      defaultValue = Some("localhost:2181")),
+    "brokerList" -> CLIOption[String]("<broker server list string>", required = false,
+      defaultValue = Some("localhost:9092")),
+    "sourceTopic" -> CLIOption[String]("<kafka source topic>", required = false,
+      defaultValue = Some("topic1")),
+    "sinkTopic" -> CLIOption[String]("<kafka sink topic>", required = false,
+      defaultValue = Some("topic2"))
   )
 
-  def application(config: ParseResult, system: ActorSystem) : StreamApplication = {
+  def application(config: ParseResult, system: ActorSystem): StreamApplication = {
     implicit val actorSystem = system
     val sourceNum = config.getInt("source")
     val sinkNum = config.getInt("sink")
@@ -59,7 +66,7 @@ object KafkaReadWrite extends AkkaApp with ArgumentsParser {
     val sink = new KafkaSink(sinkTopic, brokerList)
     val sinkProcessor = DataSinkProcessor(sink, sinkNum)
     val partitioner = new ShufflePartitioner
-    val computation = sourceProcessor  ~ partitioner ~> sinkProcessor
+    val computation = sourceProcessor ~ partitioner ~> sinkProcessor
     val app = StreamApplication("KafkaReadWrite", Graph(computation), appConfig)
     app
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala b/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala
index 51bc3d6..6955bcc 100644
--- a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala
+++ b/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,31 +19,34 @@
 package io.gearpump.streaming.examples.kafka.wordcount
 
 import akka.actor.ActorSystem
-import io.gearpump.streaming.kafka.lib.KafkaSourceConfig
-import io.gearpump.streaming.{StreamApplication, Processor}
-import io.gearpump.streaming.kafka.{KafkaSink, KafkaStorageFactory, KafkaSource}
-import io.gearpump.streaming.sink.DataSinkProcessor
-import io.gearpump.streaming.source.DataSourceProcessor
+import kafka.api.OffsetRequest
+import org.slf4j.Logger
+
 import io.gearpump.cluster.UserConfig
 import io.gearpump.cluster.client.ClientContext
 import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
 import io.gearpump.partitioner.HashPartitioner
+import io.gearpump.streaming.kafka.lib.KafkaSourceConfig
+import io.gearpump.streaming.kafka.{KafkaSink, KafkaSource, KafkaStorageFactory}
+import io.gearpump.streaming.sink.DataSinkProcessor
+import io.gearpump.streaming.source.DataSourceProcessor
+import io.gearpump.streaming.{Processor, StreamApplication}
 import io.gearpump.util.Graph._
 import io.gearpump.util.{AkkaApp, Graph, LogUtil}
-import kafka.api.OffsetRequest
-import org.slf4j.Logger
 
 object KafkaWordCount extends AkkaApp with ArgumentsParser {
   private val LOG: Logger = LogUtil.getLogger(getClass)
 
   override val options: Array[(String, CLIOption[Any])] = Array(
-    "source" -> CLIOption[Int]("<how many kafka source tasks>", required = false, defaultValue = Some(1)),
+    "source" -> CLIOption[Int]("<how many kafka source tasks>", required = false,
+      defaultValue = Some(1)),
     "split" -> CLIOption[Int]("<how many split tasks>", required = false, defaultValue = Some(1)),
     "sum" -> CLIOption[Int]("<how many sum tasks>", required = false, defaultValue = Some(1)),
-    "sink" -> CLIOption[Int]("<how many kafka sink tasks>", required = false, defaultValue = Some(1))
-    )
+    "sink" -> CLIOption[Int]("<how many kafka sink tasks>", required = false,
+      defaultValue = Some(1))
+  )
 
-  def application(config: ParseResult, system: ActorSystem) : StreamApplication = {
+  def application(config: ParseResult, system: ActorSystem): StreamApplication = {
     implicit val actorSystem = system
     val sourceNum = config.getInt("source")
     val splitNum = config.getInt("split")
@@ -61,7 +64,8 @@ object KafkaWordCount extends AkkaApp with ArgumentsParser {
     val sink = new KafkaSink("topic2", "localhost:9092")
     val sinkProcessor = DataSinkProcessor(sink, sinkNum)
     val partitioner = new HashPartitioner
-    val computation = sourceProcessor ~ partitioner ~> split ~ partitioner ~> sum ~ partitioner ~> sinkProcessor
+    val computation = sourceProcessor ~ partitioner ~> split ~ partitioner ~>
+      sum ~ partitioner ~> sinkProcessor
     val app = StreamApplication("KafkaWordCount", Graph(computation), appConfig)
     app
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Split.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Split.scala b/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Split.scala
index 499f1ac..b46d170 100644
--- a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Split.scala
+++ b/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Split.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,18 +19,20 @@
 package io.gearpump.streaming.examples.kafka.wordcount
 
 import com.twitter.bijection.Injection
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
+
 import io.gearpump.Message
 import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
 
-class Split(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
+class Split(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
   import taskContext.output
 
-  override def onStart(startTime : StartTime) : Unit = {
+  override def onStart(startTime: StartTime): Unit = {
   }
 
-  override def onNext(msg : Message) : Unit = {
-    Injection.invert[String, Array[Byte]](msg.msg.asInstanceOf[Array[Byte]]).foreach(_.split("\\s+").foreach(
-      word => output(new Message(word, msg.timestamp))))
+  override def onNext(msg: Message): Unit = {
+    Injection.invert[String, Array[Byte]](msg.msg.asInstanceOf[Array[Byte]])
+      .foreach(_.split("\\s+").foreach(
+        word => output(new Message(word, msg.timestamp))))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Sum.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Sum.scala b/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Sum.scala
index 465754e..9c67733 100644
--- a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Sum.scala
+++ b/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Sum.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,18 +19,19 @@
 package io.gearpump.streaming.examples.kafka.wordcount
 
 import com.twitter.bijection.Injection
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
+
 import io.gearpump.Message
 import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
 
-class Sum(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
+class Sum(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
   import taskContext.output
 
   private[wordcount] var wordcount = Map.empty[String, Long]
 
-  override def onStart(startTime : StartTime) : Unit = {}
+  override def onStart(startTime: StartTime): Unit = {}
 
-  override def onNext(message : Message) : Unit = {
+  override def onNext(message: Message): Unit = {
     val word = message.msg.asInstanceOf[String]
     val count = wordcount.getOrElse(word, 0L) + 1
     wordcount += word -> count

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCountSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCountSpec.scala b/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCountSpec.scala
index 9e79d7e..35f7a62 100644
--- a/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCountSpec.scala
+++ b/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCountSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,17 +18,19 @@
 
 package io.gearpump.streaming.examples.kafka.wordcount
 
-import io.gearpump.cluster.ClientToMaster.SubmitApplication
-import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
-import io.gearpump.cluster.{MasterHarness, TestUtil}
-import io.gearpump.util.Util
+import scala.concurrent.Future
+import scala.util.Success
+
+import com.typesafe.config.Config
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
 
-import scala.util.Success
-import scala.concurrent.Future
+import io.gearpump.cluster.ClientToMaster.SubmitApplication
+import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
+import io.gearpump.cluster.{MasterHarness, TestUtil}
 
-class KafkaWordCountSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness {
+class KafkaWordCountSpec
+  extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness {
 
   before {
     startActorSystem()
@@ -38,7 +40,7 @@ class KafkaWordCountSpec extends PropSpec with PropertyChecks with Matchers with
     shutdownActorSystem()
   }
 
-  override def config = TestUtil.DEFAULT_CONFIG
+  override def config: Config = TestUtil.DEFAULT_CONFIG
 
   property("KafkaWordCount should succeed to submit application with required arguments") {
     val requiredArgs = Array.empty[String]
@@ -58,7 +60,9 @@ class KafkaWordCountSpec extends PropSpec with PropertyChecks with Matchers with
     forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) =>
       val args = requiredArgs ++ optionalArgs
 
-      Future {KafkaWordCount.main(masterConfig, args)}
+      Future {
+        KafkaWordCount.main(masterConfig, args)
+      }
 
       masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
       masterReceiver.reply(SubmitApplicationResult(Success(0)))

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SplitSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SplitSpec.scala b/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SplitSpec.scala
index 864f97d..2cc6a16 100644
--- a/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SplitSpec.scala
+++ b/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SplitSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,15 +18,14 @@
 package io.gearpump.streaming.examples.kafka.wordcount
 
 import com.twitter.bijection.Injection
-import io.gearpump.streaming.task.TaskContext
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
 import org.mockito.Matchers._
 import org.mockito.Mockito._
 import org.scalatest._
 import org.scalatest.mock.MockitoSugar
 
-import scala.language.postfixOps
+import io.gearpump.Message
+import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.task.TaskContext
 
 class SplitSpec extends FlatSpec with Matchers with MockitoSugar {
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala b/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala
index 6014d0f..4dcb9d7 100644
--- a/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala
+++ b/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,23 +17,24 @@
  */
 package io.gearpump.streaming.examples.kafka.wordcount
 
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.task.StartTime
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
+import scala.collection.mutable
+
 import org.mockito.Matchers._
 import org.mockito.Mockito._
 import org.scalacheck.Gen
 import org.scalatest.{FlatSpec, Matchers}
 
-import scala.collection.mutable
+import io.gearpump.Message
+import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.MockUtil
+import io.gearpump.streaming.task.StartTime
 
 class SumSpec extends FlatSpec with Matchers {
 
   it should "sum should calculate the frequency of the word correctly" in {
 
     val stringGenerator = Gen.alphaStr
-    val expectedWordCountMap : mutable.HashMap[String, Long] = new mutable.HashMap[String, Long]()
+    val expectedWordCountMap: mutable.HashMap[String, Long] = new mutable.HashMap[String, Long]()
 
     val taskContext = MockUtil.mockTaskContext
 
@@ -42,7 +43,7 @@ class SumSpec extends FlatSpec with Matchers {
     val str = "once two two three three three"
 
     var totalWordCount = 0
-    stringGenerator.map {word =>
+    stringGenerator.map { word =>
       totalWordCount += 1
       expectedWordCountMap.put(word, expectedWordCountMap.getOrElse(word, 0L) + 1)
       sum.onNext(Message(word))

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/sol/README.md
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/README.md b/examples/streaming/sol/README.md
index e772af4..a8b10b3 100644
--- a/examples/streaming/sol/README.md
+++ b/examples/streaming/sol/README.md
@@ -1,6 +1,6 @@
 SOL is a throughput test. It will create multiple layers, and then do random shuffling between these layers.
 
-SOLPRoducer -> SOLProcessor -> SOLProcessor -> ...
+SOLProducer -> SOLProcessor -> SOLProcessor -> ...
 
 The original code comes from: https://github.com/yahoo/storm-perf-test
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOL.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOL.scala b/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOL.scala
index b531250..10c190c 100644
--- a/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOL.scala
+++ b/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOL.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,25 +18,30 @@
 
 package io.gearpump.streaming.examples.sol
 
-import io.gearpump.streaming.{StreamApplication, Processor}
+import org.slf4j.Logger
+
 import io.gearpump.cluster.UserConfig
 import io.gearpump.cluster.client.ClientContext
 import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
 import io.gearpump.partitioner.ShufflePartitioner
+import io.gearpump.streaming.{Processor, StreamApplication}
 import io.gearpump.util.Graph._
 import io.gearpump.util.{AkkaApp, Graph, LogUtil}
-import org.slf4j.Logger
 
 object SOL extends AkkaApp with ArgumentsParser {
   private val LOG: Logger = LogUtil.getLogger(getClass)
 
   override val options: Array[(String, CLIOption[Any])] = Array(
-    "streamProducer"-> CLIOption[Int]("<stream producer number>", required = false, defaultValue = Some(1)),
-    "streamProcessor"-> CLIOption[Int]("<stream processor number>", required = false, defaultValue = Some(1)),
-    "bytesPerMessage" -> CLIOption[Int]("<size of each message>", required = false, defaultValue = Some(100)),
-    "stages"-> CLIOption[Int]("<how many stages to run>", required = false, defaultValue = Some(2)))
+    "streamProducer" -> CLIOption[Int]("<stream producer number>", required = false,
+    defaultValue = Some(1)),
+    "streamProcessor" -> CLIOption[Int]("<stream processor number>", required = false,
+    defaultValue = Some(1)),
+    "bytesPerMessage" -> CLIOption[Int]("<size of each message>", required = false,
+    defaultValue = Some(100)),
+    "stages" -> CLIOption[Int]("<how many stages to run>", required = false,
+    defaultValue = Some(2)))
 
-  def application(config: ParseResult) : StreamApplication = {
+  def application(config: ParseResult): StreamApplication = {
     val spoutNum = config.getInt("streamProducer")
     val boltNum = config.getInt("streamProcessor")
     val bytesPerMessage = config.getInt("bytesPerMessage")