You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/04/26 11:42:45 UTC

[35/49] incubator-gearpump git commit: GEARPUMP-11, fix code style

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
index eb2bc6e..319ee27 100644
--- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
+++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,25 +17,28 @@
  */
 package io.gearpump.experiments.storm.topology
 
-import akka.actor.ActorRef
-import backtype.storm.spout.{SpoutOutputCollector, ISpout}
-import backtype.storm.task.{OutputCollector, IBolt, GeneralTopologyContext, TopologyContext}
 import java.util.{Map => JMap}
+
+import akka.actor.ActorRef
+import backtype.storm.spout.{ISpout, SpoutOutputCollector}
+import backtype.storm.task.{GeneralTopologyContext, IBolt, OutputCollector, TopologyContext}
 import backtype.storm.tuple.Tuple
-import io.gearpump.experiments.storm.producer.StormSpoutOutputCollector
-import io.gearpump.{TimeStamp, Message}
-import io.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, GearpumpSpout}
-import io.gearpump.experiments.storm.util.StormOutputCollector
-import io.gearpump.streaming.{MockUtil, DAG}
-import io.gearpump.streaming.task.{TaskContext, StartTime, TaskId}
+import org.mockito.Matchers.{anyObject, eq => mockitoEq}
 import org.mockito.Mockito._
-import org.mockito.Matchers.{anyLong, anyObject, eq => mockitoEq}
 import org.scalacheck.Gen
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.prop.PropertyChecks
-import org.scalatest.{PropSpec, Matchers}
+import org.scalatest.{Matchers, PropSpec}
 
-class GearpumpStormComponentSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
+import io.gearpump.experiments.storm.producer.StormSpoutOutputCollector
+import io.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, GearpumpSpout}
+import io.gearpump.experiments.storm.util.StormOutputCollector
+import io.gearpump.streaming.task.{StartTime, TaskContext, TaskId}
+import io.gearpump.streaming.{DAG, MockUtil}
+import io.gearpump.{Message, TimeStamp}
+
+class GearpumpStormComponentSpec
+  extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
 
   property("GearpumpSpout lifecycle") {
     val config = mock[JMap[AnyRef, AnyRef]]
@@ -56,13 +59,14 @@ class GearpumpStormComponentSpec extends PropSpec with PropertyChecks with Match
     val gearpumpSpout = GearpumpSpout(config, spout, getDAG, getTopologyContext,
       getOutputCollector, ackEnabled = false, taskContext)
 
-    // start
+    // Start
     val startTime = mock[StartTime]
     gearpumpSpout.start(startTime)
 
-    verify(spout).open(mockitoEq(config), mockitoEq(topologyContext), anyObject[SpoutOutputCollector])
+    verify(spout).open(mockitoEq(config), mockitoEq(topologyContext),
+      anyObject[SpoutOutputCollector])
 
-    // next
+    // Next
     val message = mock[Message]
     gearpumpSpout.next(message)
 
@@ -93,16 +97,17 @@ class GearpumpStormComponentSpec extends PropSpec with PropertyChecks with Match
       val getTickTuple = mock[(GeneralTopologyContext, Int) => Tuple]
       val tickTuple = mock[Tuple]
       when(getTickTuple(mockitoEq(generalTopologyContext), anyObject[Int]())).thenReturn(tickTuple)
-      val gearpumpBolt = GearpumpBolt(config, bolt, getDAG, getTopologyContext, getGeneralTopologyContext,
-        getOutputCollector, getTickTuple, taskContext)
+      val gearpumpBolt = GearpumpBolt(config, bolt, getDAG, getTopologyContext,
+        getGeneralTopologyContext, getOutputCollector, getTickTuple, taskContext)
 
-      // start
+      // Start
       val startTime = mock[StartTime]
       gearpumpBolt.start(startTime)
 
-      verify(bolt).prepare(mockitoEq(config), mockitoEq(topologyContext), anyObject[OutputCollector])
+      verify(bolt).prepare(mockitoEq(config), mockitoEq(topologyContext),
+        anyObject[OutputCollector])
 
-      // next
+      // Next
       val gearpumpTuple = mock[GearpumpTuple]
       val tuple = mock[Tuple]
       when(gearpumpTuple.toTuple(generalTopologyContext, timestamp)).thenReturn(tuple)
@@ -112,11 +117,9 @@ class GearpumpStormComponentSpec extends PropSpec with PropertyChecks with Match
       verify(stormOutputCollector).setTimestamp(timestamp)
       verify(bolt).execute(tuple)
 
-
-      // tick
+      // Tick
       gearpumpBolt.tick(freq)
       verify(bolt).execute(tickTuple)
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
index 8f10886..f9ffc5f 100644
--- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
+++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,16 +19,16 @@
 package io.gearpump.experiments.storm.topology
 
 import java.util.{HashMap => JHashMap, Map => JMap}
+import scala.collection.JavaConverters._
 
 import backtype.storm.Config
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{Matchers, WordSpec}
+
 import io.gearpump.experiments.storm.processor.StormProcessor
 import io.gearpump.experiments.storm.producer.StormProducer
 import io.gearpump.experiments.storm.util.TopologyUtil
 import io.gearpump.streaming.MockUtil
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{Matchers, WordSpec}
-
-import scala.collection.JavaConversions._
 
 class GearpumpStormTopologySpec extends WordSpec with Matchers with MockitoSugar {
   import io.gearpump.experiments.storm.topology.GearpumpStormTopologySpec._
@@ -43,18 +43,19 @@ class GearpumpStormTopologySpec extends WordSpec with Matchers with MockitoSugar
       val appConfig = newJavaConfig(name, appVal)
 
       implicit val system = MockUtil.system
-      val topology1 = new GearpumpStormTopology("topology1", stormTopology, newEmptyConfig, newEmptyConfig)
-      topology1.getStormConfig(Config.TOPOLOGY_NAME) shouldBe "topology1"
+      val topology1 = new GearpumpStormTopology("topology1", stormTopology, newEmptyConfig,
+        newEmptyConfig)
+      topology1.getStormConfig.get(Config.TOPOLOGY_NAME) shouldBe "topology1"
       topology1.getStormConfig should not contain name
 
-      val topology2 = new GearpumpStormTopology("topology2", stormTopology, sysConfig, newEmptyConfig)
-      topology2.getStormConfig(Config.TOPOLOGY_NAME) shouldBe "topology2"
+      val topology2 = new GearpumpStormTopology("topology2", stormTopology, sysConfig,
+        newEmptyConfig)
+      topology2.getStormConfig.get(Config.TOPOLOGY_NAME) shouldBe "topology2"
       topology2.getStormConfig.get(name) shouldBe sysVal
 
       val topology3 = new GearpumpStormTopology("topology3", stormTopology, sysConfig, appConfig)
-      topology3.getStormConfig(Config.TOPOLOGY_NAME) shouldBe "topology3"
+      topology3.getStormConfig.get(Config.TOPOLOGY_NAME) shouldBe "topology3"
       topology3.getStormConfig.get(name) shouldBe appVal
-
     }
 
     "create Gearpump processors from Storm topology" in {
@@ -63,12 +64,12 @@ class GearpumpStormTopologySpec extends WordSpec with Matchers with MockitoSugar
       val gearpumpStormTopology =
         GearpumpStormTopology("app", stormTopology, null)
       val processors = gearpumpStormTopology.getProcessors
-      stormTopology.get_spouts().foreach { case (spoutId, _) =>
+      stormTopology.get_spouts().asScala.foreach { case (spoutId, _) =>
         val processor = processors(spoutId)
         processor.taskClass shouldBe classOf[StormProducer]
         processor.description shouldBe spoutId
       }
-      stormTopology.get_bolts().foreach { case (boltId, _) =>
+      stormTopology.get_bolts().asScala.foreach { case (boltId, _) =>
         val processor = processors(boltId)
         processor.taskClass shouldBe classOf[StormProcessor]
         processor.description shouldBe boltId
@@ -88,7 +89,6 @@ class GearpumpStormTopologySpec extends WordSpec with Matchers with MockitoSugar
       targets1 should contain key "3"
     }
   }
-
 }
 
 object GearpumpStormTopologySpec {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala
index 4ccdd39..f454145 100644
--- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala
+++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,17 +18,17 @@
 package io.gearpump.experiments.storm.topology
 
 import java.util.{List => JList}
+import scala.collection.JavaConverters._
 
 import backtype.storm.task.GeneralTopologyContext
 import backtype.storm.tuple.Fields
-import io.gearpump.TimeStamp
 import org.mockito.Mockito._
 import org.scalacheck.Gen
-import org.scalatest.{Matchers, PropSpec}
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.prop.PropertyChecks
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
+import org.scalatest.{Matchers, PropSpec}
+
+import io.gearpump.TimeStamp
 
 class GearpumpTupleSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
 
@@ -42,14 +42,14 @@ class GearpumpTupleSpec extends PropSpec with PropertyChecks with Matchers with
     forAll(tupleGen, Gen.alphaStr, Gen.chooseNum[Long](0, Long.MaxValue)) {
       (gearpumpTuple: GearpumpTuple, componentId: String, timestamp: TimeStamp) =>
         val topologyContext = mock[GeneralTopologyContext]
-        val fields = new Fields(gearpumpTuple.values.map(_.asInstanceOf[String]): _*)
+        val fields = new Fields(gearpumpTuple.values.asScala.map(_.asInstanceOf[String]): _*)
         when(topologyContext.getComponentId(gearpumpTuple.sourceTaskId)).thenReturn(componentId)
         when(topologyContext.getComponentOutputFields(
           componentId, gearpumpTuple.sourceStreamId)).thenReturn(fields)
 
         val tuple = gearpumpTuple.toTuple(topologyContext, timestamp)
 
-        tuple shouldBe a [TimedTuple]
+        tuple shouldBe a[TimedTuple]
         val timedTuple = tuple.asInstanceOf[TimedTuple]
         timedTuple.getValues shouldBe gearpumpTuple.values
         timedTuple.getSourceTask shouldBe gearpumpTuple.sourceTaskId
@@ -60,5 +60,4 @@ class GearpumpTupleSpec extends PropSpec with PropertyChecks with Matchers with
         timedTuple.timestamp shouldBe timestamp
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GraphBuilderSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GraphBuilderSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GraphBuilderSpec.scala
index 4efe06c..27c02fe 100644
--- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GraphBuilderSpec.scala
+++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GraphBuilderSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,13 +18,14 @@
 
 package io.gearpump.experiments.storm.util
 
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{Matchers, WordSpec}
+
 import io.gearpump.experiments.storm.partitioner.StormPartitioner
 import io.gearpump.experiments.storm.topology.GearpumpStormTopology
 import io.gearpump.streaming.Processor
 import io.gearpump.streaming.task.Task
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{Matchers, WordSpec}
 
 class GraphBuilderSpec extends WordSpec with Matchers with MockitoSugar {
 
@@ -47,7 +48,7 @@ class GraphBuilderSpec extends WordSpec with Matchers with MockitoSugar {
       graph.edges.size shouldBe 1
       val (from, edge, to) = graph.edges.head
       from shouldBe sourceProcessor
-      edge shouldBe a [StormPartitioner]
+      edge shouldBe a[StormPartitioner]
       to shouldBe targetProcessor
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GrouperSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GrouperSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GrouperSpec.scala
index 7566f2f..599ea8d 100644
--- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GrouperSpec.scala
+++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GrouperSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,18 +19,19 @@
 package io.gearpump.experiments.storm.util
 
 import java.util.{List => JList}
+import scala.collection.JavaConverters._
+
 import backtype.storm.generated.GlobalStreamId
 import backtype.storm.grouping.CustomStreamGrouping
 import backtype.storm.task.TopologyContext
 import backtype.storm.tuple.Fields
-import io.gearpump.experiments.storm.util.GrouperSpec.Value
 import org.mockito.Mockito._
 import org.scalacheck.Gen
 import org.scalatest.mock.MockitoSugar
-import org.scalatest.{Matchers, PropSpec}
 import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
 
-import scala.collection.JavaConverters._
+import io.gearpump.experiments.storm.util.GrouperSpec.Value
 
 class GrouperSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
 
@@ -77,7 +78,7 @@ class GrouperSpec extends PropSpec with PropertyChecks with Matchers with Mockit
           val grouper = new FieldsGrouper(outFields, groupFields, numTasks)
           grouper.getPartitions(taskId,
             values.toList.asJava.asInstanceOf[JList[AnyRef]])
-        }.distinct.size  shouldBe numTasks
+        }.distinct.size shouldBe numTasks
     }
   }
 
@@ -103,8 +104,8 @@ class GrouperSpec extends PropSpec with PropertyChecks with Matchers with Mockit
 
     verify(grouping).prepare(topologyContext, globalStreamId, sourceTasks)
 
-    forAll(taskIdGen, valuesGen, numTasksGen) { (taskId: Int, values: JList[AnyRef], numTasks: Int) =>
-      0.until(numTasks).foreach { i =>
+    forAll(taskIdGen, valuesGen, numTasksGen) {(taskId: Int, values: JList[AnyRef], taskNum: Int) =>
+      0.until(taskNum).foreach { i =>
         when(grouping.chooseTasks(taskId, values)).thenReturn(List(new Integer(i)).asJava)
         grouper.getPartitions(taskId, values) shouldBe List(i)
       }
@@ -114,7 +115,17 @@ class GrouperSpec extends PropSpec with PropertyChecks with Matchers with Mockit
 
 object GrouperSpec {
   class Value(val i: Int) extends AnyRef {
+
     override def toString: String = s"$i"
+
     override def hashCode(): Int = i
+
+    override def equals(other: Any): Boolean = {
+      if (other.isInstanceOf[Value]) {
+        this.i == other.asInstanceOf[Value].i
+      } else {
+        false
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
index c75e92c..1a28694 100644
--- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
+++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,21 +17,23 @@
  */
 package io.gearpump.experiments.storm.util
 
-import java.util.{Map => JMap, List => JList}
+import java.util.{List => JList, Map => JMap}
+import scala.collection.JavaConverters._
+
 import backtype.storm.generated.Grouping
-import io.gearpump._
-import io.gearpump.experiments.storm.topology.GearpumpTuple
-import io.gearpump.streaming.MockUtil
 import org.mockito.Matchers._
 import org.mockito.Mockito._
 import org.scalacheck.Gen
 import org.scalatest.mock.MockitoSugar
-import org.scalatest.{Matchers, PropSpec}
 import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
 
-import scala.collection.JavaConverters._
+import io.gearpump._
+import io.gearpump.experiments.storm.topology.GearpumpTuple
+import io.gearpump.streaming.MockUtil
 
-class StormOutputCollectorSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
+class StormOutputCollectorSpec
+  extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
 
   val stormTaskId = 0
   val streamIdGen = Gen.alphaStr
@@ -43,13 +45,15 @@ class StormOutputCollectorSpec extends PropSpec with PropertyChecks with Matcher
       (timestamp: TimeStamp, streamId: String, values: JList[AnyRef]) =>
         val targets = mock[JMap[String, JMap[String, Grouping]]]
         val taskToComponent = mock[JMap[Integer, String]]
-        val getTargetPartitionsFn = mock[(String, JList[AnyRef]) => (Map[String, Array[Int]], JList[Integer])]
+        val getTargetPartitionsFn = mock[(String, JList[AnyRef]) =>
+          (Map[String, Array[Int]], JList[Integer])]
         val targetPartitions = mock[Map[String, Array[Int]]]
         val targetStormTaskIds = mock[JList[Integer]]
-        when(getTargetPartitionsFn(streamId, values)).thenReturn((targetPartitions, targetStormTaskIds))
+        when(getTargetPartitionsFn(streamId, values)).thenReturn((targetPartitions,
+          targetStormTaskIds))
         val taskContext = MockUtil.mockTaskContext
-        val stormOutputCollector = new StormOutputCollector(stormTaskId, taskToComponent, targets, getTargetPartitionsFn,
-          taskContext, LatestTime)
+        val stormOutputCollector = new StormOutputCollector(stormTaskId, taskToComponent,
+          targets, getTargetPartitionsFn, taskContext, LatestTime)
 
         when(targets.containsKey(streamId)).thenReturn(false)
         stormOutputCollector.emit(streamId, values) shouldBe StormOutputCollector.EMPTY_LIST
@@ -59,12 +63,11 @@ class StormOutputCollectorSpec extends PropSpec with PropertyChecks with Matcher
         stormOutputCollector.setTimestamp(timestamp)
         stormOutputCollector.emit(streamId, values) shouldBe targetStormTaskIds
         verify(taskContext, times(1)).output(MockUtil.argMatch[Message]({
-           case Message(tuple: GearpumpTuple, t) =>
-             val expected = new GearpumpTuple(values, stormTaskId, streamId, targetPartitions)
-             tuple == expected && t == timestamp
+          case Message(tuple: GearpumpTuple, t) =>
+            val expected = new GearpumpTuple(values, stormTaskId, streamId, targetPartitions)
+            tuple == expected && t == timestamp
         }))
     }
-
   }
 
   property("StormOutputCollector emit direct to a task") {
@@ -75,13 +78,15 @@ class StormOutputCollectorSpec extends PropSpec with PropertyChecks with Matcher
         val targets = mock[JMap[String, JMap[String, Grouping]]]
         val taskToComponent = mock[JMap[Integer, String]]
         when(taskToComponent.get(id)).thenReturn(target)
-        val getTargetPartitionsFn = mock[(String, JList[AnyRef]) => (Map[String, Array[Int]], JList[Integer])]
+        val getTargetPartitionsFn = mock[(String, JList[AnyRef]) =>
+          (Map[String, Array[Int]], JList[Integer])]
         val targetPartitions = mock[Map[String, Array[Int]]]
         val targetStormTaskIds = mock[JList[Integer]]
-        when(getTargetPartitionsFn(streamId, values)).thenReturn((targetPartitions, targetStormTaskIds))
+        when(getTargetPartitionsFn(streamId, values)).thenReturn((targetPartitions,
+          targetStormTaskIds))
         val taskContext = MockUtil.mockTaskContext
-        val stormOutputCollector = new StormOutputCollector(stormTaskId, taskToComponent, targets, getTargetPartitionsFn,
-          taskContext, LatestTime)
+        val stormOutputCollector = new StormOutputCollector(stormTaskId, taskToComponent,
+          targets, getTargetPartitionsFn, taskContext, LatestTime)
 
         when(targets.containsKey(streamId)).thenReturn(false)
         verify(taskContext, times(0)).output(anyObject[Message])
@@ -91,11 +96,14 @@ class StormOutputCollectorSpec extends PropSpec with PropertyChecks with Matcher
         stormOutputCollector.emitDirect(id, streamId, values)
         val partitions = Array(StormUtil.stormTaskIdToGearpump(id).index)
         verify(taskContext, times(1)).output(MockUtil.argMatch[Message]({
-          case Message(tuple: GearpumpTuple, t) =>
-            val expected = new GearpumpTuple(values, stormTaskId, streamId, Map(target -> partitions))
-            tuple == expected && t == timestamp
+          case Message(tuple: GearpumpTuple, t) => {
+            val expected = new GearpumpTuple(values, stormTaskId, streamId,
+            Map(target -> partitions))
+
+            val result = tuple == expected && t == timestamp
+            result
+          }
         }))
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala
index f42b920..d8a29e8 100644
--- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala
+++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,20 +19,20 @@
 package io.gearpump.experiments.storm.util
 
 import java.util.{HashMap => JHashMap, List => JList, Map => JMap}
+import scala.collection.JavaConverters._
 
 import akka.actor.ExtendedActorSystem
 import backtype.storm.utils.Utils
 import com.esotericsoftware.kryo.Kryo
-import io.gearpump.cluster.UserConfig
-import io.gearpump.experiments.storm.topology.GearpumpTuple
-import io.gearpump.experiments.storm.util.StormConstants._
-import io.gearpump.streaming.MockUtil
 import org.scalacheck.Gen
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{Matchers, PropSpec}
 
-import scala.collection.JavaConverters._
+import io.gearpump.cluster.UserConfig
+import io.gearpump.experiments.storm.topology.GearpumpTuple
+import io.gearpump.experiments.storm.util.StormConstants._
+import io.gearpump.streaming.MockUtil
 
 class StormSerializerPoolSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
 
@@ -44,7 +44,7 @@ class StormSerializerPoolSpec extends PropSpec with PropertyChecks with Matchers
     val stormConfig = Utils.readDefaultConfig.asInstanceOf[JMap[AnyRef, AnyRef]]
     val config = UserConfig.empty.withValue[JMap[AnyRef, AnyRef]](STORM_CONFIG, stormConfig)
     serializerPool.init(system, config)
-    serializerPool.get shouldBe a [StormSerializer]
+    serializerPool.get shouldBe a[StormSerializer]
   }
 
   property("StormSerializer should serialize and deserialize GearpumpTuple") {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormUtilSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormUtilSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormUtilSpec.scala
index 610b701..a9b93c9 100644
--- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormUtilSpec.scala
+++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormUtilSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,27 +20,25 @@ package io.gearpump.experiments.storm.util
 
 import java.lang.{Boolean => JBoolean, Long => JLong}
 import java.util.{HashMap => JHashMap, Map => JMap}
+import scala.collection.JavaConverters._
 
 import backtype.storm.Config
 import backtype.storm.generated.StormTopology
-import io.gearpump.cluster.UserConfig
-import io.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, GearpumpSpout}
-import io.gearpump.experiments.storm.util.StormConstants._
-import io.gearpump.experiments.storm.util.StormUtil._
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.task.TaskId
 import org.apache.storm.shade.org.json.simple.JSONValue
 import org.scalacheck.Gen
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{Matchers, PropSpec}
 
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
+import io.gearpump.cluster.UserConfig
+import io.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, GearpumpSpout}
+import io.gearpump.experiments.storm.util.StormConstants._
+import io.gearpump.experiments.storm.util.StormUtil._
+import io.gearpump.streaming.MockUtil
+import io.gearpump.streaming.task.TaskId
 
 class StormUtilSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
 
-
   property("convert Storm task ids to gearpump TaskIds and back") {
     val idGen = Gen.chooseNum[Int](0, Int.MaxValue)
     forAll(idGen) { (stormTaskId: Int) =>
@@ -60,23 +58,23 @@ class StormUtilSpec extends PropSpec with PropertyChecks with Matchers with Mock
     val topology = TopologyUtil.getTestTopology
     implicit val actorSystem = taskContext.system
     val userConfig = UserConfig.empty
-          .withValue[StormTopology](STORM_TOPOLOGY, topology)
-          .withValue[JMap[AnyRef, AnyRef]](STORM_CONFIG, new JHashMap[AnyRef, AnyRef])
-    topology.get_spouts foreach { case (spoutId, _) =>
+      .withValue[StormTopology](STORM_TOPOLOGY, topology)
+      .withValue[JMap[AnyRef, AnyRef]](STORM_CONFIG, new JHashMap[AnyRef, AnyRef])
+    topology.get_spouts.asScala.foreach { case (spoutId, _) =>
       val config = userConfig.withString(STORM_COMPONENT, spoutId)
       val component = getGearpumpStormComponent(taskContext, config)(taskContext.system)
-      component shouldBe a [GearpumpSpout]
+      component shouldBe a[GearpumpSpout]
     }
-    topology.get_bolts foreach { case (boltId, _) =>
+    topology.get_bolts.asScala.foreach { case (boltId, _) =>
       val config = userConfig.withString(STORM_COMPONENT, boltId)
       val component = getGearpumpStormComponent(taskContext, config)(taskContext.system)
-      component shouldBe a [GearpumpBolt]
+      component shouldBe a[GearpumpBolt]
     }
   }
 
   property("parse json to map") {
     val mapGen = Gen.listOf[String](Gen.alphaStr)
-        .map(_.map(s => (s, s)).toMap.asJava.asInstanceOf[JMap[AnyRef, AnyRef]])
+      .map(_.map(s => (s, s)).toMap.asJava.asInstanceOf[JMap[AnyRef, AnyRef]])
 
     forAll(mapGen) { (map: JMap[AnyRef, AnyRef]) =>
       parseJsonStringToMap(JSONValue.toJSONString(map)) shouldBe map
@@ -86,7 +84,7 @@ class StormUtilSpec extends PropSpec with PropertyChecks with Matchers with Mock
     forAll(invalidJsonGen) { (invalidJson: String) =>
       val map = parseJsonStringToMap(invalidJson)
       map shouldBe empty
-      map shouldBe a [JMap[_, _]]
+      map shouldBe a[JMap[_, _]]
     }
   }
 
@@ -109,7 +107,7 @@ class StormUtilSpec extends PropSpec with PropertyChecks with Matchers with Mock
 
     forAll(Gen.alphaStr) { (s: String) =>
       conf.put(name, s)
-      an [IllegalArgumentException] should be thrownBy getInt(conf, name)
+      an[IllegalArgumentException] should be thrownBy getInt(conf, name)
     }
   }
 
@@ -127,7 +125,7 @@ class StormUtilSpec extends PropSpec with PropertyChecks with Matchers with Mock
 
     forAll(Gen.alphaStr) { (s: String) =>
       conf.put(name, s)
-      an [IllegalArgumentException] should be thrownBy getBoolean(conf, name)
+      an[IllegalArgumentException] should be thrownBy getBoolean(conf, name)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/TopologyUtil.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/TopologyUtil.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/TopologyUtil.scala
index 24cce69..8491786 100644
--- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/TopologyUtil.scala
+++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/TopologyUtil.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/Constants.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/Constants.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/Constants.scala
index 989cfaa..6618b48 100644
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/Constants.scala
+++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/Constants.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/Command.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/Command.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/Command.scala
index 6c38bde..af871ab 100644
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/Command.scala
+++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/Command.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,11 +19,13 @@
 package io.gearpump.experiments.yarn.appmaster
 
 import com.typesafe.config.Config
+
 import io.gearpump.cluster.main.{Master, Worker}
 import io.gearpump.experiments.yarn.Constants._
 import io.gearpump.transport.HostPort
 import io.gearpump.util.Constants
 
+/** Command to start a YARN container */
 trait Command {
   def get: String
   override def toString: String = get
@@ -32,22 +34,26 @@ trait Command {
 abstract class AbstractCommand extends Command {
   protected def config: Config
   def version: String
-  def classPath = Array(
-    s"conf",
-    s"pack/$version/conf",
-    s"pack/$version/lib/daemon/*",
-    s"pack/$version/lib/*"
-  )
+  def classPath: Array[String] = {
+    Array(
+      s"conf",
+      s"pack/$version/conf",
+      s"pack/$version/lib/daemon/*",
+      s"pack/$version/lib/*"
+    )
+  }
 
-  protected def buildCommand(java: String, properties: Array[String], mainClazz: String, cliOpts: Array[String]):String = {
+  protected def buildCommand(
+      java: String, properties: Array[String], mainClazz: String, cliOpts: Array[String])
+    : String = {
     val exe = config.getString(java)
 
     s"$exe -cp ${classPath.mkString(":")}:" +
       "$CLASSPATH " + properties.mkString(" ") +
-      s"  $mainClazz ${cliOpts.mkString(" ")} 2>&1 | /usr/bin/tee -a ${LOG_DIR_EXPANSION_VAR}/stderr"
+      s" $mainClazz ${cliOpts.mkString(" ")} 2>&1 | /usr/bin/tee -a ${LOG_DIR_EXPANSION_VAR}/stderr"
   }
 
-  protected def clazz(any: AnyRef) : String = {
+  protected def clazz(any: AnyRef): String = {
     val name = any.getClass.getName
     if (name.endsWith("$")) {
       name.dropRight(1)
@@ -57,10 +63,11 @@ abstract class AbstractCommand extends Command {
   }
 }
 
-case class MasterCommand(config: Config, version: String, masterAddr: HostPort) extends AbstractCommand {
+case class MasterCommand(config: Config, version: String, masterAddr: HostPort)
+  extends AbstractCommand {
 
   def get: String = {
-    val masterArguments = Array(s"-ip ${masterAddr.host}",  s"-port ${masterAddr.port}")
+    val masterArguments = Array(s"-ip ${masterAddr.host}", s"-port ${masterAddr.port}")
 
     val properties = Array(
       s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=${masterAddr.host}:${masterAddr.port}",
@@ -74,7 +81,8 @@ case class MasterCommand(config: Config, version: String, masterAddr: HostPort)
   }
 }
 
-case class WorkerCommand(config: Config, version: String, masterAddr: HostPort, workerHost: String) extends AbstractCommand {
+case class WorkerCommand(config: Config, version: String, masterAddr: HostPort, workerHost: String)
+  extends AbstractCommand {
 
   def get: String = {
     val properties = Array(
@@ -85,11 +93,12 @@ case class WorkerCommand(config: Config, version: String, masterAddr: HostPort,
       s"-D${Constants.GEARPUMP_LOG_APPLICATION_DIR}=${LOG_DIR_EXPANSION_VAR}",
       s"-D${Constants.GEARPUMP_HOSTNAME}=$workerHost")
 
-    buildCommand(WORKER_COMMAND, properties,  clazz(Worker), Array.empty[String])
+    buildCommand(WORKER_COMMAND, properties, clazz(Worker), Array.empty[String])
   }
 }
 
-case class AppMasterCommand(config: Config, version: String, args: Array[String]) extends AbstractCommand {
+case class AppMasterCommand(config: Config, version: String, args: Array[String])
+  extends AbstractCommand {
 
   override val classPath = Array(
     "conf",

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/UIService.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/UIService.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/UIService.scala
index 92f85b6..6dd5011 100644
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/UIService.scala
+++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/UIService.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,28 +18,30 @@
 
 package io.gearpump.experiments.yarn.appmaster
 
+import scala.concurrent.Future
+
 import akka.actor._
 import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
+
 import io.gearpump.cluster.ClusterConfig
 import io.gearpump.services.main.Services
 import io.gearpump.transport.HostPort
 import io.gearpump.util.{ActorUtil, Constants, LogUtil}
 
-import scala.concurrent.Future
-
 trait UIFactory {
   def props(masters: List[HostPort], host: String, port: Int): Props
 }
 
+/** Wrapper of UI server */
 class UIService(masters: List[HostPort], host: String, port: Int) extends Actor {
   private val LOG = LogUtil.getLogger(getClass)
 
   private val supervisor = ActorUtil.getFullPath(context.system, context.parent.path)
   private var configFile: java.io.File = null
 
-  implicit val dispatcher = context.dispatcher
+  private implicit val dispatcher = context.dispatcher
 
-  override def postStop: Unit = {
+  override def postStop(): Unit = {
     if (configFile != null) {
       configFile.delete()
       configFile = null
@@ -51,22 +53,20 @@ class UIService(masters: List[HostPort], host: String, port: Int) extends Actor
   }
 
   override def preStart(): Unit = {
-    Future(start)
+    Future(start())
   }
 
-  def start: Unit = {
+  def start(): Unit = {
     val mastersArg = masters.mkString(",")
     LOG.info(s"Launching services -master $mastersArg")
 
     configFile = java.io.File.createTempFile("uiserver", ".conf")
 
-
     val config = context.system.settings.config.
       withValue(Constants.GEARPUMP_SERVICE_HOST, ConfigValueFactory.fromAnyRef(host)).
       withValue(Constants.GEARPUMP_SERVICE_HTTP, ConfigValueFactory.fromAnyRef(port.toString)).
       withValue(Constants.NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(host))
 
-
     ClusterConfig.saveConfig(config, configFile)
 
     val master = masters.head
@@ -75,9 +75,10 @@ class UIService(masters: List[HostPort], host: String, port: Int) extends Actor
     launch(supervisor, master.host, master.port, configFile.toString)
   }
 
+  // Launch the UI server
   def launch(supervisor: String, masterHost: String, masterPort: Int, configFile: String): Unit = {
     Services.main(Array("-supervisor", supervisor, "-master", s"$masterHost:$masterPort"
-     , "-conf", configFile))
+      , "-conf", configFile))
   }
 
   override def receive: Receive = {
@@ -86,7 +87,7 @@ class UIService(masters: List[HostPort], host: String, port: Int) extends Actor
   }
 }
 
-object UIService extends UIFactory{
+object UIService extends UIFactory {
   override def props(masters: List[HostPort], host: String, port: Int): Props = {
     Props(new UIService(masters, host, port))
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
index f8982ce..1df7fb9 100644
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
+++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,10 +20,17 @@ package io.gearpump.experiments.yarn.appmaster
 
 import java.io.IOException
 import java.util.concurrent.TimeUnit
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
 
 import akka.actor._
 import akka.util.Timeout
 import com.typesafe.config.ConfigValueFactory
+import org.apache.commons.httpclient.HttpClient
+import org.apache.commons.httpclient.methods.GetMethod
+import org.slf4j.Logger
+
 import io.gearpump.cluster.ClientToMaster._
 import io.gearpump.cluster.ClusterConfig
 import io.gearpump.cluster.main.{ArgumentsParser, CLIOption}
@@ -32,13 +39,16 @@ import io.gearpump.experiments.yarn.glue.Records._
 import io.gearpump.experiments.yarn.glue.{NMClient, RMClient, YarnConfig}
 import io.gearpump.transport.HostPort
 import io.gearpump.util._
-import org.apache.commons.httpclient.HttpClient
-import org.apache.commons.httpclient.methods.GetMethod
-import org.slf4j.Logger
 
+/**
+ * YARN AppMaster. YARN AppMaster is responsible to start Gearpump masters, workers, UI server as
+ * YARN containers.
+ *
+ * NOTE: It is different with Gearpump AppMaster. Gearpump AppMaster is a sub-process of worker.
+ */
 class YarnAppMaster(rmClient: RMClient, nmClient: NMClient,
-                    packagePath: String, hdfsConfDir: String,
-                    uiFactory: UIFactory)
+    packagePath: String, hdfsConfDir: String,
+    uiFactory: UIFactory)
   extends Actor {
 
   private val LOG: Logger = LogUtil.getLogger(getClass)
@@ -47,11 +57,11 @@ class YarnAppMaster(rmClient: RMClient, nmClient: NMClient,
   private var uiStarted = false
   private val host = akkaConf.getString(Constants.GEARPUMP_HOSTNAME)
 
-  val port = Util.findFreePort.get
+  private val port = Util.findFreePort().get
 
   private val trackingURL = "http://" + host + ":" + port
 
-  //TODO: for now, only one master is supported.
+  // TODO: for now, only one master is supported.
   private val masterCount = 1
   private val masterMemory = akkaConf.getString(MASTER_MEMORY).toInt
   private val masterVCores = akkaConf.getString(MASTER_VCORES).toInt
@@ -67,7 +77,7 @@ class YarnAppMaster(rmClient: RMClient, nmClient: NMClient,
 
   def receive: Receive = null
 
-  private def registerAppMaster: Unit = {
+  private def registerAppMaster(): Unit = {
     val target = host + ":" + port
     rmClient.registerAppMaster(host, port, trackingURL)
   }
@@ -75,7 +85,7 @@ class YarnAppMaster(rmClient: RMClient, nmClient: NMClient,
   registerAppMaster
   context.become(waitForAppMasterRegistered)
 
-  import YarnAppMaster._
+  import io.gearpump.experiments.yarn.appmaster.YarnAppMaster._
 
   def waitForAppMasterRegistered: Receive = {
     case AppMasterRegistered =>
@@ -86,15 +96,16 @@ class YarnAppMaster(rmClient: RMClient, nmClient: NMClient,
 
   private def startingMasters(remain: Int, masters: List[MasterInfo]): Receive = box {
     case ContainersAllocated(containers) =>
-      LOG.info(s"ContainersAllocated: containers allocated for master(remain=$remain), count: " + containers.size)
+      LOG.info(s"ContainersAllocated: containers allocated for master(remain=$remain), count: "
+        + containers.size)
       val count = Math.min(containers.length, remain)
-      val newMasters = (0 until count).toList.map{index =>
+      val newMasters = (0 until count).toList.map { index =>
         val container = containers(index)
         MasterInfo(container.getId, container.getNodeId, launchMaster(container))
       }
 
-      // stop un-used containers
-      containers.drop(count).map{container =>
+      // Stops un-used containers
+      containers.drop(count).map { container =>
         nmClient.stopContainer(container.getId, container.getNodeId)
       }
 
@@ -113,26 +124,28 @@ class YarnAppMaster(rmClient: RMClient, nmClient: NMClient,
     onError orElse receive orElse unHandled
   }
 
-  private def startingWorkers(remain: Int, masters: List[MasterInfo], workers: List[WorkerInfo]): Receive =
+  private def startingWorkers(remain: Int, masters: List[MasterInfo], workers: List[WorkerInfo])
+  : Receive = {
     box {
       case ContainersAllocated(containers) =>
-        LOG.info(s"ContainersAllocated: containers allocated for workers(remain=$remain), count: " + containers.size)
+        LOG.info(s"ContainersAllocated: containers allocated for workers(remain=$remain), " +
+          s"count: " + containers.size)
 
         val count = Math.min(containers.length, remain)
-        val newWorkers = (0 until count).toList.map{index =>
+        val newWorkers = (0 until count).toList.map { index =>
           val container = containers(index)
           launchWorker(container, masters)
           WorkerInfo(container.getId, container.getNodeId)
         }
 
-        // stop un-used containers
-        containers.drop(count).map{container =>
+        // Stops un-used containers
+        containers.drop(count).map { container =>
           nmClient.stopContainer(container.getId, container.getNodeId)
         }
         context.become(startingWorkers(remain, masters, workers ++ newWorkers))
       case ContainerStarted(containerId) =>
         LOG.info(s"ContainerStarted: container $containerId started for worker(remain=$remain)")
-        // the last one
+        // The last one
         if (remain > 1) {
           context.become(startingWorkers(remain - 1, masters, workers))
         } else {
@@ -143,21 +156,22 @@ class YarnAppMaster(rmClient: RMClient, nmClient: NMClient,
           context.become(service(effectiveConfig(masters.map(_.host)), masters, workers))
         }
     }
-
-  import scala.collection.JavaConversions._
+  }
 
   private def effectiveConfig(masters: List[HostPort]): Config = {
     val masterList = masters.map(pair => s"${pair.host}:${pair.port}")
     val config = context.system.settings.config
-    config.withValue(Constants.GEARPUMP_CLUSTER_MASTERS, ConfigValueFactory.fromIterable(masterList))
+    config.withValue(Constants.GEARPUMP_CLUSTER_MASTERS,
+      ConfigValueFactory.fromIterable(masterList.asJava))
   }
 
   private def onError: Receive = {
     case ContainersCompleted(containers) =>
-      //TODO: we should recover the failed container from this...
+      // TODO: we should recover the failed container from this...
       containers.foreach { status =>
         if (status.getExitStatus != 0) {
-          LOG.error(s"ContainersCompleted: container ${status.getContainerId} failed with exit code ${status.getExitStatus}, msg: ${status.getDiagnostics}")
+          LOG.error(s"ContainersCompleted: container ${status.getContainerId}" +
+            s" failed with exit code ${status.getExitStatus}, msg: ${status.getDiagnostics}")
         } else {
           LOG.info(s"ContainersCompleted: container ${status.getContainerId} completed")
         }
@@ -178,22 +192,24 @@ class YarnAppMaster(rmClient: RMClient, nmClient: NMClient,
       self ! ShutdownApplication
   }
 
-  private def service(config: Config, masters: List[MasterInfo], workers: List[WorkerInfo]): Receive  = box {
+  private def service(config: Config, masters: List[MasterInfo], workers: List[WorkerInfo])
+  : Receive = box {
     case GetActiveConfig(clientHost) =>
       LOG.info("GetActiveConfig: Get active configuration for client: " + clientHost)
-      val filtered = ClusterConfig.filterOutDefaultConfig(config.withValue(Constants.GEARPUMP_HOSTNAME,
-        ConfigValueFactory.fromAnyRef(clientHost)))
+      val filtered = ClusterConfig.filterOutDefaultConfig(
+        config.withValue(Constants.GEARPUMP_HOSTNAME,
+          ConfigValueFactory.fromAnyRef(clientHost)))
       sender ! ActiveConfig(filtered)
     case QueryVersion =>
       LOG.info("QueryVersion")
       sender ! Version(Util.version)
     case QueryClusterInfo =>
       LOG.info("QueryClusterInfo")
-      val masterContainers = masters.map{master =>
+      val masterContainers = masters.map { master =>
         master.id.toString + s"(${master.nodeId.toString})"
       }
 
-      val workerContainers = workers.map{worker=>
+      val workerContainers = workers.map { worker =>
         worker.id.toString + s"(${worker.nodeId.toString})"
       }
       sender ! ClusterInfo(masterContainers, workerContainers)
@@ -242,7 +258,7 @@ class YarnAppMaster(rmClient: RMClient, nmClient: NMClient,
     LOG.info(s"Launch Master on container " + container.getNodeHttpAddress)
     val host = container.getNodeId.getHost
 
-    val port = Util.findFreePort.get
+    val port = Util.findFreePort().get
 
     LOG.info("=============PORT" + port)
     val masterCommand = MasterCommand(akkaConf, rootPath, HostPort(host, port))
@@ -297,7 +313,7 @@ object YarnAppMaster extends AkkaApp with ArgumentsParser {
       nmClient, packagePath, confDir, UIService)))
 
     val daemon = system.actorOf(Props(new Daemon(appMaster)))
-    system.awaitTermination()
+    Await.result(system.whenTerminated, Duration.Inf)
     LOG.info("YarnAppMaster is shutdown")
   }
 
@@ -307,8 +323,9 @@ object YarnAppMaster extends AkkaApp with ArgumentsParser {
     override def receive: Actor.Receive = {
       case Terminated(actor) =>
         if (actor.compareTo(appMaster) == 0) {
-          LOG.info(s"YarnAppMaster ${appMaster.path.toString} is terminated, shutting down current ActorSystem")
-          context.system.shutdown()
+          LOG.info(s"YarnAppMaster ${appMaster.path.toString} is terminated, " +
+            s"shutting down current ActorSystem")
+          context.system.terminate()
           context.stop(self)
         }
     }
@@ -353,7 +370,7 @@ object YarnAppMaster extends AkkaApp with ArgumentsParser {
     var status = client.executeMethod(get)
 
     if (status != 200) {
-      // sleep a little bit, and try again
+      // Sleeps a little bit, and try again
       Thread.sleep(3000)
       status = client.executeMethod(get)
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala
index 9f30570..49ec3a0 100644
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala
+++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,19 +19,18 @@
 package io.gearpump.experiments.yarn.client
 
 import java.io.IOException
+import scala.util.Try
 
 import akka.actor.{ActorRef, ActorSystem}
-import io.gearpump.experiments.yarn.glue.Records.ApplicationId
-import io.gearpump.experiments.yarn.glue.YarnClient
-import io.gearpump.util.{AkkaHelper, LogUtil}
 import org.apache.commons.httpclient.HttpClient
 import org.apache.commons.httpclient.methods.GetMethod
-import org.slf4j.Logger
 
-import scala.util.Try
+import io.gearpump.experiments.yarn.glue.Records.ApplicationId
+import io.gearpump.experiments.yarn.glue.YarnClient
+import io.gearpump.util.{AkkaHelper, LogUtil}
 
 /**
- * Resolve AppMaster ActorRef
+ * Resolves AppMaster ActorRef
  */
 class AppMasterResolver(yarnClient: YarnClient, system: ActorSystem) {
   val LOG = LogUtil.getLogger(getClass)
@@ -67,7 +66,8 @@ class AppMasterResolver(yarnClient: YarnClient, system: ActorSystem) {
       index += 1
       val tryConnect = Try(fun)
       if (tryConnect.isFailure) {
-        Console.err.println(s"Failed to connect YarnAppMaster(tried $index)... "  + tryConnect.failed.get.getMessage)
+        LOG.error(s"Failed to connect YarnAppMaster(tried $index)... " +
+          tryConnect.failed.get.getMessage)
       } else {
         result = tryConnect.get
       }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/Client.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/Client.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/Client.scala
index 2c3f99e..7c3bc38 100644
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/Client.scala
+++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/Client.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,10 +16,12 @@
  * limitations under the License.
  */
 package io.gearpump.experiments.yarn.client
-import io.gearpump.util.LogUtil
 import org.slf4j.Logger
 
-object Client  {
+import io.gearpump.util.LogUtil
+
+/** Command line tool to launch a Gearpump cluster on YARN, and also to manage Gearpump cluster */
+object Client {
 
   private val LOG: Logger = LogUtil.getLogger(getClass)
   val LAUNCH = "launch"
@@ -27,14 +29,16 @@ object Client  {
   val commands = Map(LAUNCH -> LaunchCluster) ++
     ManageCluster.commands.map(key => (key, ManageCluster)).toMap
 
-  def usage: Unit = {
+  def usage(): Unit = {
     val keys = commands.keys.toList.sorted
+    // scalastyle:off println
     Console.err.println("Usage: " + "<" + keys.mkString("|") + ">")
+    // scalastyle:on println
   }
 
-  def main(args: Array[String]) = {
+  def main(args: Array[String]): Unit = {
     if (args.length == 0) {
-      usage
+      usage()
     } else {
       val key = args(0)
       val command = commands.get(key)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/LaunchCluster.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/LaunchCluster.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/LaunchCluster.scala
index 0ff4f47..ea5e707 100644
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/LaunchCluster.scala
+++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/LaunchCluster.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,9 +20,13 @@ package io.gearpump.experiments.yarn.client
 import java.io.{File, IOException, OutputStreamWriter}
 import java.net.InetAddress
 import java.util.zip.ZipInputStream
-import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
+
 import akka.actor.ActorSystem
 import com.typesafe.config.{Config, ConfigValueFactory}
+import org.slf4j.Logger
+
 import io.gearpump.cluster.ClusterConfig
 import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
 import io.gearpump.experiments.yarn
@@ -33,13 +37,9 @@ import io.gearpump.experiments.yarn.glue.Records.{ApplicationId, Resource}
 import io.gearpump.experiments.yarn.glue.{FileSystem, YarnClient, YarnConfig}
 import io.gearpump.util.ActorUtil.askActor
 import io.gearpump.util.{AkkaApp, LogUtil, Util}
-import org.slf4j.Logger
-
-import scala.concurrent.Await
 
 /**
  * Launch Gearpump on YARN
- *
  */
 class LaunchCluster(
     akka: Config,
@@ -50,7 +50,7 @@ class LaunchCluster(
     appMasterResolver: AppMasterResolver,
     version: String = Util.version) {
 
-  import yarn.Constants._
+  import io.gearpump.experiments.yarn.Constants._
   private implicit val dispatcher = actorSystem.dispatcher
 
   private val LOG: Logger = LogUtil.getLogger(getClass)
@@ -59,13 +59,13 @@ class LaunchCluster(
   private val memory = akka.getString(APPMASTER_MEMORY).toInt
   private val vcore = akka.getString(APPMASTER_VCORES).toInt
 
-
   def submit(appName: String, packagePath: String): ApplicationId = {
     LOG.info("Starting AM")
 
-    // first step, check the version, to make sure local version matches remote version
+    // First step, check the version, to make sure local version matches remote version
     if (!packagePath.endsWith(".zip")) {
-      throw new IOException(s"YarnClient only support .zip distribution package, now it is ${packagePath}. Please download the zip " +
+      throw new IOException(s"YarnClient only support .zip distribution package," +
+        s" now it is ${packagePath}. Please download the zip " +
         "package from website or use sbt assembly packArchiveZip to build one.")
     }
 
@@ -76,24 +76,30 @@ class LaunchCluster(
     val rootEntry = rootEntryPath(zip = packagePath)
 
     if (!rootEntry.contains(version)) {
-      throw new IOException(s"Check version failed! Local gearpump binary version $version doesn't match with remote path $packagePath")
+      throw new IOException(s"Check version failed! Local gearpump binary" +
+        s" version $version doesn't match with remote path $packagePath")
     }
 
     val resource = Resource.newInstance(memory, vcore)
     val appId = yarnClient.createApplication
 
-    // will upload the configs to HDFS home directory of current user.
+    // uploads the configs to HDFS home directory of current user.
     val configPath = uploadConfigToHDFS(appId)
 
-    val command = AppMasterCommand(akka, rootEntry, Array(s"-conf $configPath", s"-package $packagePath"))
+    val command = AppMasterCommand(akka, rootEntry, Array(s"-conf $configPath",
+      s"-package $packagePath"))
 
     yarnClient.submit(appName, appId, command.get, resource, queue, packagePath, configPath)
 
     LOG.info("Waiting application to finish...")
     val report = yarnClient.awaitApplication(appId, LaunchCluster.TIMEOUT_MILLISECONDS)
-    LOG.info(s"Application $appId finished with state ${report.getYarnApplicationState} at ${report.getFinishTime}, info: ${report.getDiagnostics}")
+    LOG.info(s"Application $appId finished with state ${report.getYarnApplicationState} " +
+      s"at ${report.getFinishTime}, info: ${report.getDiagnostics}")
+
+    // scalastyle:off println
     Console.println("================================================")
     Console.println("==Application Id: " + appId)
+    // scalastyle:on println
     appId
   }
 
@@ -103,21 +109,20 @@ class LaunchCluster(
     val appMaster = appMasterResolver.resolve(appId)
     LOG.info(s"appMaster=${appMaster.path} host=$host")
     val future = askActor[ActiveConfig](appMaster, GetActiveConfig(host)).map(_.config)
-    import scala.concurrent.duration._
-    future.map{ config =>
-      val out =  new File(output)
+    future.map { config =>
+      val out = new File(output)
       ClusterConfig.saveConfig(config, out)
       out
     }
   }
 
   private def uploadConfigToHDFS(appId: ApplicationId): String = {
-    // will use personal home directory so that it will not conflict with other users
+    // Uses personal home directory so that it will not conflict with other users
     // conf path pattern: /user/<userid>/.gearpump_application_<timestamp>_<id>/conf
     val confDir = s"${fs.getHomeDirectory}/.gearpump_${appId}/conf/"
     LOG.info(s"Uploading configuration files to remote HDFS(under $confDir)...")
 
-    // copy config from local to remote.
+    // Copies config from local to remote.
     val remoteConfFile = s"$confDir/gear.conf"
     var out = fs.create(remoteConfFile)
     var writer = new OutputStreamWriter(out)
@@ -127,14 +132,14 @@ class LaunchCluster(
     writer.write(cleanedConfig.root().render())
     writer.close()
 
-    // save yarn-site.xml to remote
+    // Saves yarn-site.xml to remote
     val yarn_site_xml = s"$confDir/yarn-site.xml"
     out = fs.create(yarn_site_xml)
     writer = new OutputStreamWriter(out)
     yarnConf.writeXml(writer)
     writer.close()
 
-    // save log4j.properties to remote
+    // Saves log4j.properties to remote
     val log4j_properties = s"$confDir/log4j.properties"
     val log4j = LogUtil.loadConfiguration
     out = fs.create(log4j_properties)
@@ -164,17 +169,21 @@ object LaunchCluster extends AkkaApp with ArgumentsParser {
   }
 
   override val options: Array[(String, CLIOption[Any])] = Array(
-    PACKAGE -> CLIOption[String]("<Please specify the gearpump.zip package path on HDFS. If not specified, we will use default value /user/gearpump/gearpump.zip>", required = false),
-    NAME -> CLIOption[String]("<Application name showed in YARN>", required = false, defaultValue = Some("Gearpump")),
-    VERBOSE -> CLIOption("<print verbose log on console>", required = false, defaultValue = Some(false)),
-    OUTPUT -> CLIOption("<output path for configuration file>", required = false, defaultValue = None)
+    PACKAGE -> CLIOption[String]("<Please specify the gearpump.zip package path on HDFS. " +
+      "If not specified, we will use default value /user/gearpump/gearpump.zip>", required = false),
+    NAME -> CLIOption[String]("<Application name showed in YARN>", required = false,
+      defaultValue = Some("Gearpump")),
+    VERBOSE -> CLIOption("<print verbose log on console>", required = false,
+      defaultValue = Some(false)),
+    OUTPUT -> CLIOption("<output path for configuration file>", required = false,
+      defaultValue = None)
   )
   private val TIMEOUT_MILLISECONDS = 30 * 1000
 
   override def main(inputAkkaConf: Config, args: Array[String]): Unit = {
     val parsed = parse(args)
     if (parsed.getBoolean(VERBOSE)) {
-      LogUtil.verboseLogToConsole
+      LogUtil.verboseLogToConsole()
     }
 
     val yarnConfig = new YarnConfig()
@@ -184,24 +193,27 @@ object LaunchCluster extends AkkaApp with ArgumentsParser {
     val actorSystem = ActorSystem("launchCluster", akkaConf)
     val appMasterResolver = new AppMasterResolver(yarnClient, actorSystem)
 
-    val client = new LaunchCluster(akkaConf, yarnConfig, yarnClient, fs, actorSystem, appMasterResolver)
+    val client = new LaunchCluster(akkaConf, yarnConfig, yarnClient, fs,
+      actorSystem, appMasterResolver)
 
     val name = parsed.getString(NAME)
     val appId = client.submit(name, akkaConf.getString(Constants.PACKAGE_PATH))
 
     if (parsed.exists(OUTPUT)) {
       import scala.concurrent.duration._
-      Await.result(client.saveConfig(appId, parsed.getString(OUTPUT)), TIMEOUT_MILLISECONDS milliseconds)
+      Await.result(client.saveConfig(appId, parsed.getString(OUTPUT)),
+        TIMEOUT_MILLISECONDS.milliseconds)
     }
 
-    yarnClient.stop
-    actorSystem.shutdown()
-    actorSystem.awaitTermination()
+    yarnClient.stop()
+    actorSystem.terminate()
+    Await.result(actorSystem.whenTerminated, Duration.Inf)
   }
 
   private def updateConf(akka: Config, parsed: ParseResult): Config = {
     if (parsed.exists(PACKAGE)) {
-      akka.withValue(Constants.PACKAGE_PATH, ConfigValueFactory.fromAnyRef(parsed.getString(PACKAGE)))
+      akka.withValue(Constants.PACKAGE_PATH,
+        ConfigValueFactory.fromAnyRef(parsed.getString(PACKAGE)))
     } else {
       akka
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/ManageCluster.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/ManageCluster.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/ManageCluster.scala
index 20fd13e..3e50abe 100644
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/ManageCluster.scala
+++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/ManageCluster.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,8 +20,10 @@ package io.gearpump.experiments.yarn.client
 
 import java.io.{File, IOException}
 import java.net.InetAddress
+import scala.concurrent.{Await, Future}
 
 import akka.actor.{ActorRef, ActorSystem}
+
 import io.gearpump.cluster.ClientToMaster.{AddWorker, CommandResult, RemoveWorker}
 import io.gearpump.cluster.ClusterConfig
 import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
@@ -31,18 +33,23 @@ import io.gearpump.experiments.yarn.glue.{YarnClient, YarnConfig}
 import io.gearpump.util.ActorUtil.askActor
 import io.gearpump.util.{AkkaApp, LogUtil}
 
-import scala.concurrent.{Await, Future}
-
+/** Manage current Gearpump cluster on YARN */
 class ManageCluster(appId: ApplicationId, appMaster: ActorRef, system: ActorSystem) {
-  import ManageCluster._
+  import io.gearpump.experiments.yarn.client.ManageCluster._
 
   private val host = InetAddress.getLocalHost.getHostName
   implicit val dispatcher = system.dispatcher
 
   def getConfig: Future[ActiveConfig] = askActor[ActiveConfig](appMaster, GetActiveConfig(host))
   def version: Future[Version] = askActor[Version](appMaster, QueryVersion)
-  def addWorker(count: Int): Future[CommandResult] = askActor[CommandResult](appMaster, AddWorker(count))
-  def removeWorker(worker: String): Future[CommandResult] = askActor[CommandResult](appMaster, RemoveWorker(worker))
+  def addWorker(count: Int): Future[CommandResult] = {
+    askActor[CommandResult](appMaster, AddWorker(count))
+  }
+
+  def removeWorker(worker: String): Future[CommandResult] = {
+    askActor[CommandResult](appMaster, RemoveWorker(worker))
+  }
+
   def shutdown: Future[CommandResult] = askActor[CommandResult](appMaster, Kill)
   def queryClusterInfo: Future[ClusterInfo] = askActor[ClusterInfo](appMaster, QueryClusterInfo)
 
@@ -91,18 +98,23 @@ object ManageCluster extends AkkaApp with ArgumentsParser {
   val APPID = "appid"
   val VERBOSE = "verbose"
 
-  val commands = List(GET_CONFIG,ADD_WORKER, REMOVE_WORKER, KILL,VERSION, QUERY)
+  val commands = List(GET_CONFIG, ADD_WORKER, REMOVE_WORKER, KILL, VERSION, QUERY)
 
   import scala.concurrent.duration._
-  val TIME_OUT_SECONDS = 30 seconds
+  val TIME_OUT_SECONDS = 30.seconds
 
   override val options: Array[(String, CLIOption[Any])] = Array(
     COMMAND -> CLIOption[String](s"<${commands.mkString("|")}>", required = true),
-    APPID ->CLIOption[String]("<Application id, format: application_timestamp_id>", required = true),
-    COUNT -> CLIOption("<how many instance to add or remove>", required = false, defaultValue = Some(1)),
-    VERBOSE -> CLIOption("<print verbose log on console>", required = false, defaultValue = Some(false)),
-    OUTPUT -> CLIOption("<output path for configuration file>", required = false, defaultValue = Some("")),
-    CONTAINER -> CLIOption("<container id for master or worker>", required = false, defaultValue = Some(""))
+    APPID -> CLIOption[String]("<Application id, format: application_timestamp_id>",
+    required = true),
+    COUNT -> CLIOption("<how many instance to add or remove>", required = false,
+      defaultValue = Some(1)),
+    VERBOSE -> CLIOption("<print verbose log on console>", required = false,
+      defaultValue = Some(false)),
+    OUTPUT -> CLIOption("<output path for configuration file>", required = false,
+      defaultValue = Some("")),
+    CONTAINER -> CLIOption("<container id for master or worker>", required = false,
+      defaultValue = Some(""))
   )
 
   override def main(akkaConf: Config, args: Array[String]): Unit = {
@@ -113,7 +125,7 @@ object ManageCluster extends AkkaApp with ArgumentsParser {
     val parsed = parse(args)
 
     if (parsed.getBoolean(VERBOSE)) {
-      LogUtil.verboseLogToConsole
+      LogUtil.verboseLogToConsole()
     }
 
     val appId = parseAppId(parsed.getString(APPID))
@@ -128,9 +140,11 @@ object ManageCluster extends AkkaApp with ArgumentsParser {
     val command = parsed.getString(COMMAND)
     val result = manager.command(command, parsed)
 
+    // scalastyle:off println
     Console.println(Await.result(result, TIME_OUT_SECONDS))
-    system.shutdown()
-    system.awaitTermination()
+    // scalastyle:on println
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 
   def parseAppId(str: String): ApplicationId = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala
index 93cee28..6b3385f 100644
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala
+++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala
@@ -1,4 +1,4 @@
-/*-
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,8 +20,8 @@ package io.gearpump.experiments.yarn.glue
 
 import java.io.File
 import java.nio.ByteBuffer
+import scala.collection.JavaConverters._
 
-import io.gearpump.util.LogUtil
 import org.apache.hadoop.fs.{FileSystem => YarnFileSystem, Path}
 import org.apache.hadoop.io.DataOutputBuffer
 import org.apache.hadoop.mapreduce.security.TokenCache
@@ -32,18 +32,19 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.util.ConverterUtils
 import org.slf4j.Logger
 
-import scala.collection.JavaConversions._
+import io.gearpump.util.LogUtil
 
-private [glue]
+private[glue]
 object ContainerLaunchContext {
   private val LOG: Logger = LogUtil.getLogger(getClass)
 
-  def apply(yarnConf: YarnConfiguration, command: String, packagePath: String, configPath: String): ContainerLaunchContext = {
+  def apply(yarnConf: YarnConfiguration, command: String, packagePath: String, configPath: String)
+    : ContainerLaunchContext = {
     val context = Records.newRecord(classOf[ContainerLaunchContext])
-    context.setCommands(Seq(command))
-    context.setEnvironment(getAppEnv(yarnConf))
+    context.setCommands(Seq(command).asJava)
+    context.setEnvironment(getAppEnv(yarnConf).asJava)
     context.setTokens(getToken(yarnConf, packagePath, configPath))
-    context.setLocalResources(getAMLocalResourcesMap(yarnConf, packagePath, configPath))
+    context.setLocalResources(getAMLocalResourcesMap(yarnConf, packagePath, configPath).asJava)
     context
   }
 
@@ -60,7 +61,9 @@ object ContainerLaunchContext {
     Map(Environment.CLASSPATH.name -> allPaths.map(_.trim).mkString(File.pathSeparator))
   }
 
-  private def getAMLocalResourcesMap(yarnConf: YarnConfiguration, packagePath: String, configPath: String): Map[String, LocalResource] = {
+  private def getAMLocalResourcesMap(
+      yarnConf: YarnConfiguration, packagePath: String, configPath: String)
+    : Map[String, LocalResource] = {
     val fs = getFs(yarnConf)
 
     Map(
@@ -70,8 +73,9 @@ object ContainerLaunchContext {
         LocalResourceType.FILE, LocalResourceVisibility.APPLICATION))
   }
 
-  private def newYarnAppResource(fs: YarnFileSystem, path: Path,
-                                 resourceType: LocalResourceType, vis: LocalResourceVisibility): LocalResource = {
+  private def newYarnAppResource(
+      fs: YarnFileSystem, path: Path,
+      resourceType: LocalResourceType, vis: LocalResourceVisibility): LocalResource = {
     val qualified = fs.makeQualified(path)
     val status = fs.getFileStatus(qualified)
     val resource = Records.newRecord(classOf[LocalResource])
@@ -83,7 +87,8 @@ object ContainerLaunchContext {
     resource
   }
 
-  private def getToken(yc: YarnConfiguration, packagePath: String, configPath: String): ByteBuffer = {
+  private def getToken(yc: YarnConfiguration, packagePath: String, configPath: String)
+    : ByteBuffer = {
     val credentials = UserGroupInformation.getCurrentUser.getCredentials
     val dob = new DataOutputBuffer
     val dirs = Array(new Path(packagePath), new Path(configPath))

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/FileSystem.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/FileSystem.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/FileSystem.scala
index e3dfb7f..acf09ac 100644
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/FileSystem.scala
+++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/FileSystem.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,11 +20,11 @@ package io.gearpump.experiments.yarn.glue
 
 import java.io.{InputStream, OutputStream}
 import java.net.ConnectException
+import scala.util.{Failure, Success, Try}
 
-import io.gearpump.util.LogUtil
 import org.apache.hadoop.fs.Path
 
-import scala.util.{Success, Failure, Try}
+import io.gearpump.util.LogUtil
 
 class FileSystem(yarnConfig: YarnConfig) {
 
@@ -33,26 +33,26 @@ class FileSystem(yarnConfig: YarnConfig) {
 
   private def LOG = LogUtil.getLogger(getClass)
 
-  def open(file: String): InputStream = exceptionHandler{
+  def open(file: String): InputStream = exceptionHandler {
     val path = new Path(file)
     fs.open(path)
   }
 
-  def create(file: String): OutputStream  = exceptionHandler{
+  def create(file: String): OutputStream = exceptionHandler {
     val path = new Path(file)
     fs.create(path)
   }
 
-  def exists(file: String): Boolean  = exceptionHandler{
+  def exists(file: String): Boolean = exceptionHandler {
     val path = new Path(file)
     fs.exists(path)
   }
 
   def name: String = {
-    fs.getName
+    fs.getUri.toString
   }
 
-  def getHomeDirectory: String  = {
+  def getHomeDirectory: String = {
     fs.getHomeDirectory.toString
   }
 
@@ -62,8 +62,10 @@ class FileSystem(yarnConfig: YarnConfig) {
       case Success(v) => v
       case Failure(ex) =>
         if (ex.isInstanceOf[ConnectException]) {
-          LOG.error("Please check whether we connect to the right HDFS file system, current file system is $name." +
-            "\n. Please copy all configs under $HADOOP_HOME/etc/hadoop into conf/yarnconf directory of Gearpump package, so that we can use the right File system.", ex)
+          LOG.error("Please check whether we connect to the right HDFS file system, " +
+            "current file system is $name." + "\n. Please copy all configs under " +
+            "$HADOOP_HOME/etc/hadoop into conf/yarnconf directory of Gearpump package, " +
+            "so that we can use the right File system.", ex)
         }
         throw ex
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/NMClient.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/NMClient.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/NMClient.scala
index f0a5fc7..3e7e668 100644
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/NMClient.scala
+++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/NMClient.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -22,12 +22,13 @@ import java.nio.ByteBuffer
 
 import akka.actor.ActorRef
 import com.typesafe.config.Config
-import io.gearpump.experiments.yarn.appmaster.YarnAppMaster.ContainerStarted
-import io.gearpump.experiments.yarn.glue.Records._
-import io.gearpump.util.LogUtil
 import org.apache.hadoop.yarn.api.records.{ApplicationId => YarnApplicationId, ApplicationReport => YarnApplicationReport, Container => YarnContainer, ContainerId => YarnContainerId, ContainerStatus => YarnContainerStatus, NodeId => YarnNodeId, Resource => YarnResource}
 import org.apache.hadoop.yarn.client.api.async.NMClientAsync
 import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl
+
+import io.gearpump.experiments.yarn.appmaster.YarnAppMaster.ContainerStarted
+import io.gearpump.experiments.yarn.glue.Records._
+import io.gearpump.util.LogUtil
 /**
  * Adapter for node manager client
  */
@@ -46,45 +47,49 @@ class NMClient(yarnConf: YarnConfig, config: Config) extends NMClientAsync.Callb
     client.start()
   }
 
-  private [glue]
-  override def onContainerStarted(containerId: YarnContainerId, allServiceResponse: java.util.Map[String, ByteBuffer]) {
+  private[glue]
+  override def onContainerStarted(
+      containerId: YarnContainerId, allServiceResponse: java.util.Map[String, ByteBuffer]) {
     LOG.info(s"Container started : $containerId, " + allServiceResponse)
     reportTo ! ContainerStarted(containerId)
   }
 
-  private [glue]
-  override def onContainerStatusReceived(containerId: YarnContainerId, containerStatus: YarnContainerStatus) {
+  private[glue]
+  override def onContainerStatusReceived(
+      containerId: YarnContainerId, containerStatus: YarnContainerStatus) {
     LOG.info(s"Container status received : $containerId, status $containerStatus")
   }
 
-  private [glue]
+  private[glue]
   override def onContainerStopped(containerId: YarnContainerId) {
     LOG.error(s"Container stopped : $containerId")
   }
 
-  private [glue]
+  private[glue]
   override def onGetContainerStatusError(containerId: YarnContainerId, throwable: Throwable) {
     LOG.error(s"Container exception : $containerId", throwable)
   }
 
-  private [glue]
+  private[glue]
   override def onStartContainerError(containerId: YarnContainerId, throwable: Throwable) {
     LOG.error(s"Container exception : $containerId", throwable)
   }
 
-  private [glue]
+  private[glue]
   override def onStopContainerError(containerId: YarnContainerId, throwable: Throwable) {
     LOG.error(s"Container exception : $containerId", throwable)
   }
 
-  def launchCommand(container: Container, command: String, packagePath: String, configPath: String): Unit = {
-    LOG.info(s"Launching command : $command on container:  ${container.getId}, host ip : ${container.getNodeId.getHost}")
+  def launchCommand(
+      container: Container, command: String, packagePath: String, configPath: String): Unit = {
+    LOG.info(s"Launching command : $command on container" +
+      s":  ${container.getId}, host ip : ${container.getNodeId.getHost}")
     val context = ContainerLaunchContext(yarnConf.conf, command, packagePath, configPath)
     client.startContainerAsync(container, context)
   }
 
   def stopContainer(containerId: ContainerId, nodeId: NodeId): Unit = {
-    LOG.info(s"Stop container ${containerId.toString} on node: ${nodeId.toString} " )
+    LOG.info(s"Stop container ${containerId.toString} on node: ${nodeId.toString} ")
     client.stopContainerAsync(containerId, nodeId)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/RMClient.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/RMClient.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/RMClient.scala
index c1d26f6..7b9d83c 100644
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/RMClient.scala
+++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/RMClient.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,16 +18,17 @@
 
 package io.gearpump.experiments.yarn.glue
 
+import scala.collection.JavaConverters._
+
 import akka.actor.ActorRef
-import io.gearpump.experiments.yarn.appmaster.YarnAppMaster.{AppMasterRegistered, ContainersAllocated, ContainersCompleted, ResourceManagerException, ShutdownApplication}
-import io.gearpump.experiments.yarn.glue.Records._
-import io.gearpump.util.LogUtil
 import org.apache.hadoop.yarn.api.records.{ApplicationId => YarnApplicationId, ApplicationReport => YarnApplicationReport, Container => YarnContainer, ContainerId => YarnContainerId, ContainerStatus => YarnContainerStatus, FinalApplicationStatus, NodeId => YarnNodeId, NodeReport, Priority, Resource => YarnResource}
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync
 import org.slf4j.Logger
 
-import scala.collection.JavaConverters._
+import io.gearpump.experiments.yarn.appmaster.YarnAppMaster.{AppMasterRegistered, ContainersAllocated, ContainersCompleted, ResourceManagerException, ShutdownApplication}
+import io.gearpump.experiments.yarn.glue.Records._
+import io.gearpump.util.LogUtil
 
 /**
  * Adapter for resource manager client
@@ -46,7 +47,7 @@ class RMClient(yarnConf: YarnConfig) extends AMRMClientAsync.CallbackHandler {
   }
 
   private def startAMRMClient: AMRMClientAsync[ContainerRequest] = {
-    val timeIntervalMs = 1000 //ms
+    val timeIntervalMs = 1000 // ms
     val amrmClient = AMRMClientAsync.createAMRMClientAsync[ContainerRequest](timeIntervalMs, this)
     amrmClient.init(yarnConf.conf)
     amrmClient.start()
@@ -59,16 +60,19 @@ class RMClient(yarnConf: YarnConfig) extends AMRMClientAsync.CallbackHandler {
 
   private[glue]
   override def onContainersAllocated(containers: java.util.List[YarnContainer]) {
-    val newContainers = containers.asScala.toList.filterNot(container => allocatedContainers.contains(container.getId))
+    val newContainers = containers.asScala.toList.filterNot(container =>
+      allocatedContainers.contains(container.getId))
     allocatedContainers ++= newContainers.map(_.getId)
     LOG.info(s"New allocated ${newContainers.size} containers")
     reportTo ! ContainersAllocated(newContainers.map(yarnContainerToContainer(_)))
   }
 
   private[glue]
-  override def onContainersCompleted(completedContainers: java.util.List[YarnContainerStatus]): Unit = {
+  override def onContainersCompleted(completedContainers: java.util.List[YarnContainerStatus])
+    : Unit = {
     LOG.info(s"Got response from RM. Completed containers=${completedContainers.size()}")
-    reportTo ! ContainersCompleted(completedContainers.asScala.toList.map(yarnContainerStatusToContainerStatus(_)))
+    reportTo ! ContainersCompleted(
+      completedContainers.asScala.toList.map(yarnContainerStatusToContainerStatus(_)))
   }
 
   private[glue]