You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/04/26 11:42:33 UTC

[23/49] incubator-gearpump git commit: GEARPUMP-11, fix code style

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/transaction/api/MessageDecoder.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/MessageDecoder.scala b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/MessageDecoder.scala
index 1869eab..7039b71 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/MessageDecoder.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/MessageDecoder.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,8 +21,7 @@ package io.gearpump.streaming.transaction.api
 import io.gearpump.Message
 
 /**
- * MessageDecoder decodes raw bytes to Message
- * It is usually written by end user and
+ * MessageDecoder decodes raw bytes to Message It is usually written by end user and
  * passed into TimeReplayableSource
  */
 trait MessageDecoder extends java.io.Serializable {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetManager.scala b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetManager.scala
index 29656dd..412ddcc 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetManager.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetManager.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,28 +18,27 @@
 
 package io.gearpump.streaming.transaction.api
 
-import io.gearpump.{Message, TimeStamp}
-
 import scala.util.Try
 
+import io.gearpump.{Message, TimeStamp}
+
 /**
- * filter offsets and store the mapping from timestamp to offset
+ * Filters offsets and store the mapping from timestamp to offset
  */
 trait MessageFilter {
   def filter(messageAndOffset: (Message, Long)): Option[Message]
 }
 
 /**
- * resolve timestamp to offset by look up the underlying storage
+ * Resolves timestamp to offset by look up the underlying storage
  */
 trait OffsetTimeStampResolver {
   def resolveOffset(time: TimeStamp): Try[Long]
 }
 
 /**
- * manages message's offset on TimeReplayableSource and timestamp
+ * Manages message's offset on TimeReplayableSource and timestamp
  */
 trait OffsetManager extends MessageFilter with OffsetTimeStampResolver {
   def close(): Unit
-}
-
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetStorage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetStorage.scala b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetStorage.scala
index aa9966e..fa7161c 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetStorage.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetStorage.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,10 +18,10 @@
 
 package io.gearpump.streaming.transaction.api
 
-import io.gearpump.TimeStamp
-
 import scala.util.Try
 
+import io.gearpump.TimeStamp
+
 object OffsetStorage {
 
   /**
@@ -47,15 +47,17 @@ object OffsetStorage {
  */
 trait OffsetStorage {
   /**
-   * try to look up the time in the OffsetStorage
-   * return the corresponding Offset if the time is
-   * in the range of stored TimeStamps or one of the
-   * failure info (StorageEmpty, Overflow, Underflow)
+   * Tries to look up the time in the OffsetStorage return the corresponding Offset if the time is
+   * in the range of stored TimeStamps or one of the failure info (StorageEmpty, Overflow,
+   * Underflow)
+   *
    * @param time the time to look for
    * @return the corresponding offset if the time is in the range, otherwise failure
    */
   def lookUp(time: TimeStamp): Try[Array[Byte]]
+
   def append(time: TimeStamp, offset: Array[Byte]): Unit
+
   def close(): Unit
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeReplayableSource.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeReplayableSource.scala b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeReplayableSource.scala
index b6c1e6c..50711ee 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeReplayableSource.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeReplayableSource.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,11 +21,10 @@ package io.gearpump.streaming.transaction.api
 import io.gearpump.streaming.source.DataSource
 
 /**
- * AT-LEAST-ONCE API
+ * AT-LEAST-ONCE API. Represents a data source which allow replaying.
  *
- * subclass should be able to replay messages on recovery from the time
- * when an application crashed
+ * Subclass should be able to replay messages on recovery from the time
+ * when an application crashed.
  */
 trait TimeReplayableSource extends DataSource
 
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeStampFilter.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeStampFilter.scala b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeStampFilter.scala
index 7df508c..7c34e1a 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeStampFilter.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeStampFilter.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,10 +21,9 @@ package io.gearpump.streaming.transaction.api
 import io.gearpump.{Message, TimeStamp}
 
 /**
- * TimeStampFilter filters message comparing its TimeStamp with the predicate.
+ * TimeStampFilter filters out messages that are obsolete.
  */
 trait TimeStampFilter extends java.io.Serializable {
   def filter(msg: Message, predicate: TimeStamp): Option[Message]
 }
 
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/util/ActorPathUtil.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/util/ActorPathUtil.scala b/streaming/src/main/scala/io/gearpump/streaming/util/ActorPathUtil.scala
index 5edcf38..c2ac32a 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/util/ActorPathUtil.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/util/ActorPathUtil.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,16 +19,22 @@
 package io.gearpump.streaming.util
 
 import akka.actor.{ActorPath, ActorRef}
+
 import io.gearpump.streaming.task.TaskId
 
 object ActorPathUtil {
 
-  def executorActorName(executorId: Int) = executorId.toString
+  def executorActorName(executorId: Int): String = executorId.toString
 
-  def taskActorName(taskId: TaskId) = s"processor_${taskId.processorId}_task_${taskId.index}"
+  def taskActorName(taskId: TaskId): String = {
+    s"processor_${taskId.processorId}_task_${taskId.index}"
+  }
 
   def taskActorPath(appMaster: ActorRef, executorId: Int, taskId: TaskId): ActorPath = {
-    appMaster.path.child(executorManagerActorName).child(executorActorName(executorId)).child(taskActorName(taskId))
+    val executorManager = appMaster.path.child(executorManagerActorName)
+    val executor = executorManager.child(executorActorName(executorId))
+    val task = executor.child(taskActorName(taskId))
+    task
   }
 
   def executorManagerActorName: String = "executors"

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/DAGSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/DAGSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/DAGSpec.scala
index 0207fb3..13b8e34 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/DAGSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/DAGSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,13 +18,14 @@
 
 package io.gearpump.streaming
 
+import org.scalacheck.Gen
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
 import io.gearpump.partitioner.PartitionerDescription
 import io.gearpump.streaming.task.TaskId
 import io.gearpump.util.Graph
 import io.gearpump.util.Graph.Node
-import org.scalacheck.Gen
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
 
 class DAGSpec extends PropSpec with PropertyChecks with Matchers {
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/MessageSerializerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/MessageSerializerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/MessageSerializerSpec.scala
index a5067f2..7938415 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/MessageSerializerSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/MessageSerializerSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,11 +17,12 @@
  */
 package io.gearpump.streaming
 
-import io.gearpump.streaming.task._
-import io.gearpump.transport.netty.WrappedChannelBuffer
 import org.jboss.netty.buffer.{ChannelBufferOutputStream, ChannelBuffers}
 import org.scalatest.{Matchers, WordSpec}
 
+import io.gearpump.streaming.task._
+import io.gearpump.transport.netty.WrappedChannelBuffer
+
 class MessageSerializerSpec extends WordSpec with Matchers {
 
   def testSerializer[T](obj: T, taskMessageSerializer: TaskMessageSerializer[T]): T = {
@@ -32,7 +33,7 @@ class MessageSerializerSpec extends WordSpec with Matchers {
     taskMessageSerializer.read(input)
   }
 
-  "SerializedMessageSerializer"  should {
+  "SerializedMessageSerializer" should {
     "serialize and deserialize SerializedMessage properly" in {
       val serializer = new SerializedMessageSerializer
       val data = new Array[Byte](256)
@@ -43,7 +44,7 @@ class MessageSerializerSpec extends WordSpec with Matchers {
     }
   }
 
-  "TaskIdSerializer"  should {
+  "TaskIdSerializer" should {
     "serialize and deserialize TaskId properly" in {
       val taskIdSerializer = new TaskIdSerializer
       val taskId = TaskId(1, 3)
@@ -51,7 +52,7 @@ class MessageSerializerSpec extends WordSpec with Matchers {
     }
   }
 
-  "AckRequestSerializer"  should {
+  "AckRequestSerializer" should {
     "serialize and deserialize AckRequest properly" in {
       val serializer = new AckRequestSerializer
       val ackRequest = AckRequest(TaskId(1, 2), 1000, 1024)
@@ -59,7 +60,7 @@ class MessageSerializerSpec extends WordSpec with Matchers {
     }
   }
 
-  "InitialAckRequestSerializer"  should {
+  "InitialAckRequestSerializer" should {
     "serialize and deserialize AckRequest properly" in {
       val serializer = new InitialAckRequestSerializer
       val ackRequest = InitialAckRequest(TaskId(1, 2), 1024)
@@ -67,7 +68,7 @@ class MessageSerializerSpec extends WordSpec with Matchers {
     }
   }
 
-  "AckSerializer"  should {
+  "AckSerializer" should {
     "serialize and deserialize Ack properly" in {
       val serializer = new AckSerializer
       val ack = Ack(TaskId(1, 2), 1024, 1023, 1799)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/MockUtil.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/MockUtil.scala b/streaming/src/test/scala/io/gearpump/streaming/MockUtil.scala
index 6577718..40310f7 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/MockUtil.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/MockUtil.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,14 +17,12 @@
  */
 package io.gearpump.streaming
 
-import akka.actor.{ActorSystem, Actor}
-import akka.testkit.{TestProbe, TestActorRef}
-import io.gearpump.streaming.task.TaskContext
-import io.gearpump.streaming.task.TaskId
+import akka.actor.{Actor, ActorSystem}
+import akka.testkit.TestActorRef
+import org.mockito.{ArgumentMatcher, Matchers, Mockito}
+
 import io.gearpump.cluster.TestUtil
-import org.mockito.{Mockito, ArgumentMatcher}
-import org.mockito.Mockito
-import org.mockito.Matchers
+import io.gearpump.streaming.task.{TaskContext, TaskId}
 
 object MockUtil {
 
@@ -39,7 +37,7 @@ object MockUtil {
     context
   }
 
-  def argMatch[T](func: T => Boolean) : T = {
+  def argMatch[T](func: T => Boolean): T = {
     Matchers.argThat(new ArgumentMatcher[T] {
       override def matches(param: Any): Boolean = {
         val mesage = param.asInstanceOf[T]

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/StreamingTestUtil.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/StreamingTestUtil.scala b/streaming/src/test/scala/io/gearpump/streaming/StreamingTestUtil.scala
index 6866907..2ea8b84 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/StreamingTestUtil.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/StreamingTestUtil.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,10 +19,11 @@ package io.gearpump.streaming
 
 import akka.actor._
 import akka.testkit.TestActorRef
+
 import io.gearpump.cluster.AppMasterToMaster.RegisterAppMaster
 import io.gearpump.cluster.appmaster.AppMasterRuntimeInfo
 import io.gearpump.cluster.scheduler.Resource
-import io.gearpump.cluster.{MiniCluster, AppDescription, AppMasterContext, UserConfig}
+import io.gearpump.cluster.{AppDescription, AppMasterContext, MiniCluster, UserConfig}
 import io.gearpump.streaming.appmaster.AppMaster
 import io.gearpump.util.Graph
 
@@ -33,7 +34,8 @@ object StreamingTestUtil {
   def startAppMaster(miniCluster: MiniCluster, appId: Int): TestActorRef[AppMaster] = {
 
     implicit val actorSystem = miniCluster.system
-    val masterConf = AppMasterContext(appId, testUserName, Resource(1),  null, None,miniCluster.mockMaster,AppMasterRuntimeInfo(appId, appName = appId.toString))
+    val masterConf = AppMasterContext(appId, testUserName, Resource(1), null,
+      None, miniCluster.mockMaster, AppMasterRuntimeInfo(appId, appName = appId.toString))
 
     val app = StreamApplication("test", Graph.empty, UserConfig.empty)
     val appDescription = AppDescription(app.name, app.appMaster.getName, app.userConfig)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/appmaster/AppMasterSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/AppMasterSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/AppMasterSpec.scala
index d1d7006..90fb8ab 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/AppMasterSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/AppMasterSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,9 +17,13 @@
  */
 package io.gearpump.streaming.appmaster
 
+import scala.concurrent.duration._
+
 import akka.actor.{ActorRef, Props}
 import akka.testkit.{TestActorRef, TestProbe}
-import io.gearpump.{WorkerId, Message}
+import org.scalatest._
+
+import io.gearpump.Message
 import io.gearpump.cluster.AppMasterToMaster._
 import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor
 import io.gearpump.cluster.ClientToMaster.ShutdownApplication
@@ -29,19 +33,16 @@ import io.gearpump.cluster._
 import io.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, AppMasterRuntimeInfo}
 import io.gearpump.cluster.master.MasterProxy
 import io.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest}
+import io.gearpump.cluster.worker.WorkerId
 import io.gearpump.jarstore.FilePath
 import io.gearpump.partitioner.HashPartitioner
 import io.gearpump.streaming.task.{StartTime, TaskContext, _}
 import io.gearpump.streaming.{Processor, StreamApplication}
 import io.gearpump.util.Graph
 import io.gearpump.util.Graph._
-import org.scalatest._
-
-import scala.concurrent.duration._
-import scala.language.postfixOps
 
 class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with MasterHarness {
-  override def config = TestUtil.DEFAULT_CONFIG
+  protected override def config = TestUtil.DEFAULT_CONFIG
 
   var appMaster: ActorRef = null
 
@@ -63,7 +64,7 @@ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with
   var appMasterContext: AppMasterContext = null
   var appMasterRuntimeInfo: AppMasterRuntimeInfo = null
 
-  override def beforeEach() = {
+  override def beforeEach(): Unit = {
     startActorSystem()
 
     mockTask = TestProbe()(getActorSystem)
@@ -76,50 +77,54 @@ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with
     implicit val system = getActorSystem
     conf = UserConfig.empty.withValue(AppMasterSpec.MASTER, mockMaster.ref)
     val mockJar = AppJar("for_test", FilePath("path"))
-    appMasterContext = AppMasterContext(appId, "test", resource, null, Some(mockJar), mockMaster.ref, appMasterRuntimeInfo)
+    appMasterContext = AppMasterContext(appId, "test", resource, null, Some(mockJar),
+      mockMaster.ref, appMasterRuntimeInfo)
     val graph = Graph(taskDescription1 ~ partitioner ~> taskDescription2)
     val streamApp = StreamApplication("test", graph, conf)
     appDescription = Application.ApplicationToAppDescription(streamApp)
     import scala.concurrent.duration._
-    mockMasterProxy = getActorSystem.actorOf(
-      Props(new MasterProxy(List(mockMaster.ref.path), 30 seconds)), AppMasterSpec.MOCK_MASTER_PROXY)
+    mockMasterProxy = getActorSystem.actorOf(Props(new MasterProxy(List(mockMaster.ref.path),
+      30.seconds)), AppMasterSpec.MOCK_MASTER_PROXY)
     TestActorRef[AppMaster](
-      AppMasterRuntimeEnvironment.props(List(mockMasterProxy.path), appDescription, appMasterContext))(getActorSystem)
+      AppMasterRuntimeEnvironment.props(List(mockMasterProxy.path), appDescription,
+        appMasterContext))(getActorSystem)
 
-    val registerAppMaster = mockMaster.receiveOne(15 seconds)
+    val registerAppMaster = mockMaster.receiveOne(15.seconds)
     assert(registerAppMaster.isInstanceOf[RegisterAppMaster])
     appMaster = registerAppMaster.asInstanceOf[RegisterAppMaster].appMaster
 
     mockMaster.reply(AppMasterRegistered(appId))
-    mockMaster.expectMsg(15 seconds, GetAppData(appId, "DAG"))
+    mockMaster.expectMsg(15.seconds, GetAppData(appId, "DAG"))
     mockMaster.reply(GetAppDataResult("DAG", null))
-    mockMaster.expectMsg(15 seconds, GetAppData(appId, "startClock"))
+    mockMaster.expectMsg(15.seconds, GetAppData(appId, "startClock"))
 
     mockMaster.reply(GetAppDataResult("startClock", 0L))
 
-    mockMaster.expectMsg(15 seconds, RequestResource(appId, ResourceRequest(Resource(4), workerId = WorkerId.unspecified)))
+    mockMaster.expectMsg(15.seconds, RequestResource(appId, ResourceRequest(Resource(4),
+      workerId = WorkerId.unspecified)))
   }
 
-  override def afterEach() = {
+  override def afterEach(): Unit = {
     shutdownActorSystem()
   }
 
   "AppMaster" should {
     "kill it self when allocate resource time out" in {
-      mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(Resource(2), mockWorker.ref, workerId))))
-      mockMaster.expectMsg(60 seconds, ShutdownApplication(appId))
+      mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(Resource(2),
+        mockWorker.ref, workerId))))
+      mockMaster.expectMsg(60.seconds, ShutdownApplication(appId))
     }
 
     "reschedule the resource when the worker reject to start executor" in {
       val resource = Resource(4)
-      mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(resource, mockWorker.ref, workerId))))
+      mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(resource,
+        mockWorker.ref, workerId))))
       mockWorker.expectMsgClass(classOf[LaunchExecutor])
       mockWorker.reply(ExecutorLaunchRejected(""))
       mockMaster.expectMsg(RequestResource(appId, ResourceRequest(resource, WorkerId.unspecified)))
     }
 
     "find a new master when lost connection with master" in {
-      println(config.getList("akka.loggers"))
 
       val watcher = TestProbe()(getActorSystem)
       watcher.watch(mockMasterProxy)
@@ -130,74 +135,74 @@ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with
       Thread.sleep(2000)
 
       import scala.concurrent.duration._
-      mockMasterProxy = getActorSystem.actorOf(Props(new MasterProxy(List(mockMaster.ref.path), 30 seconds)), AppMasterSpec.MOCK_MASTER_PROXY)
-      mockMaster.expectMsgClass(15 seconds, classOf[RegisterAppMaster])
-    }
-
-    /*
-
-     TODO: This test is failing on Travis randomly
-     We have not identifed the root cause.
-     Check: https://travis-ci.org/intel-hadoop/gearpump/builds/56826843
-     Issue tracker: https://github.com/intel-hadoop/gearpump/issues/733
-
-    "launch executor and task properly" in {
-      mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(Resource(4), mockWorker.ref, workerId))))
-      mockWorker.expectMsgClass(classOf[LaunchExecutor])
-
-      val workerSystem = ActorSystem("worker", TestUtil.DEFAULT_CONFIG)
-      mockWorker.reply(RegisterActorSystem(ActorUtil.getSystemAddress(workerSystem).toString))
-      for (i <- 1 to 4) {
-        mockMaster.expectMsg(10 seconds, AppMasterSpec.TaskStarted)
-      }
-
-      //clock status: task(0,0) -> 1, task(0,1)->0, task(1, 0)->0, task(1,1)->0
-      appMaster.tell(UpdateClock(TaskId(0, 0), 1), mockTask.ref)
-
-      //there is no further upstream, so the upstreamMinClock is Long.MaxValue
-      mockTask.expectMsg(UpstreamMinClock(Long.MaxValue))
-
-      // check min clock
-      appMaster.tell(GetLatestMinClock, mockTask.ref)
-      mockTask.expectMsg(LatestMinClock(0))
-
-
-      //clock status: task(0,0) -> 1, task(0,1)->1, task(1, 0)->0, task(1,1)->0
-      appMaster.tell(UpdateClock(TaskId(0, 1), 1), mockTask.ref)
-
-      //there is no further upstream, so the upstreamMinClock is Long.MaxValue
-      mockTask.expectMsg(UpstreamMinClock(Long.MaxValue))
-
-      // check min clock
-      appMaster.tell(GetLatestMinClock, mockTask.ref)
-      mockTask.expectMsg(LatestMinClock(0))
-
-      //clock status: task(0,0) -> 1, task(0,1)->1, task(1, 1)->0, task(1,1)->0
-      appMaster.tell(UpdateClock(TaskId(1, 0), 1), mockTask.ref)
-
-      // min clock of processor 0 (Task(0, 0) and Task(0, 1))
-      mockTask.expectMsg(UpstreamMinClock(1))
-
-      // check min clock
-      appMaster.tell(GetLatestMinClock, mockTask.ref)
-      mockTask.expectMsg(LatestMinClock(0))
-
-      //clock status: task(0,0) -> 1, task(0,1)->1, task(1, 1)->0, task(1,1)->1
-      appMaster.tell(UpdateClock(TaskId(1, 1), 1), mockTask.ref)
-
-      // min clock of processor 0 (Task(0, 0) and Task(0, 1))
-      mockTask.expectMsg(UpstreamMinClock(1))
-
-      // check min clock
-      appMaster.tell(GetLatestMinClock, mockTask.ref)
-      mockTask.expectMsg(LatestMinClock(1))
-
-      //shutdown worker and all executor on this work, expect appmaster to ask for new resources
-      workerSystem.shutdown()
-      mockMaster.expectMsg(RequestResource(appId, ResourceRequest(Resource(4), relaxation = Relaxation.ONEWORKER)))
+      mockMasterProxy = getActorSystem.actorOf(Props(new MasterProxy(List(mockMaster.ref.path),
+        30.seconds)), AppMasterSpec.MOCK_MASTER_PROXY)
+      mockMaster.expectMsgClass(15.seconds, classOf[RegisterAppMaster])
     }
-**/
 
+    //    // TODO: This test is failing on Travis randomly
+    //    // We have not identifed the root cause.
+    //    // Check: https://travis-ci.org/intel-hadoop/gearpump/builds/56826843
+    //    // Issue tracker: https://github.com/intel-hadoop/gearpump/issues/733
+    //
+    //    "launch executor and task properly" in {
+    //      mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(Resource(4), mockWorker.ref,
+    //       workerId))))
+    //      mockWorker.expectMsgClass(classOf[LaunchExecutor])
+    //
+    //      val workerSystem = ActorSystem("worker", TestUtil.DEFAULT_CONFIG)
+    //      mockWorker.reply(RegisterActorSystem(ActorUtil.getSystemAddress(workerSystem).toString))
+    //      for (i <- 1 to 4) {
+    //        mockMaster.expectMsg(10 seconds, AppMasterSpec.TaskStarted)
+    //      }
+    //
+    //      // clock status: task(0,0) -> 1, task(0,1)->0, task(1, 0)->0, task(1,1)->0
+    //      appMaster.tell(UpdateClock(TaskId(0, 0), 1), mockTask.ref)
+    //
+    //      // there is no further upstream, so the upstreamMinClock is Long.MaxValue
+    //      mockTask.expectMsg(UpstreamMinClock(Long.MaxValue))
+    //
+    //      // check min clock
+    //      appMaster.tell(GetLatestMinClock, mockTask.ref)
+    //      mockTask.expectMsg(LatestMinClock(0))
+    //
+    //
+    //      // clock status: task(0,0) -> 1, task(0,1)->1, task(1, 0)->0, task(1,1)->0
+    //      appMaster.tell(UpdateClock(TaskId(0, 1), 1), mockTask.ref)
+    //
+    //      // there is no further upstream, so the upstreamMinClock is Long.MaxValue
+    //      mockTask.expectMsg(UpstreamMinClock(Long.MaxValue))
+    //
+    //      // check min clock
+    //      appMaster.tell(GetLatestMinClock, mockTask.ref)
+    //      mockTask.expectMsg(LatestMinClock(0))
+    //
+    //      // Clock status: task(0,0) -> 1, task(0,1)->1, task(1, 1)->0, task(1,1)->0
+    //      appMaster.tell(UpdateClock(TaskId(1, 0), 1), mockTask.ref)
+    //
+    //      // Min clock of processor 0 (Task(0, 0) and Task(0, 1))
+    //      mockTask.expectMsg(UpstreamMinClock(1))
+    //
+    //      // check min clock
+    //      appMaster.tell(GetLatestMinClock, mockTask.ref)
+    //      mockTask.expectMsg(LatestMinClock(0))
+    //
+    //      // clock status: task(0,0) -> 1, task(0,1)->1, task(1, 1)->0, task(1,1)->1
+    //      appMaster.tell(UpdateClock(TaskId(1, 1), 1), mockTask.ref)
+    //
+    //      // min clock of processor 0 (Task(0, 0) and Task(0, 1))
+    //      mockTask.expectMsg(UpstreamMinClock(1))
+    //
+    //      // check min clock
+    //      appMaster.tell(GetLatestMinClock, mockTask.ref)
+    //      mockTask.expectMsg(LatestMinClock(1))
+    //
+    //      // shutdown worker and all executor on this work, expect appmaster to ask
+    //      // for new resources
+    //      workerSystem.shutdown()
+    //      mockMaster.expectMsg(RequestResource(appId, ResourceRequest(Resource(4), relaxation =
+    //        Relaxation.ONEWORKER)))
+    //    }
   }
 
   def ignoreSaveAppData: PartialFunction[Any, Boolean] = {
@@ -212,7 +217,7 @@ object AppMasterSpec {
   val MOCK_MASTER_PROXY = "mockMasterProxy"
 }
 
-class TaskA(taskContext : TaskContext, userConf : UserConfig) extends Task(taskContext, userConf) {
+class TaskA(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
 
   val master = userConf.getValue[ActorRef](AppMasterSpec.MASTER).get
   override def onStart(startTime: StartTime): Unit = {
@@ -222,7 +227,7 @@ class TaskA(taskContext : TaskContext, userConf : UserConfig) extends Task(taskC
   override def onNext(msg: Message): Unit = {}
 }
 
-class TaskB(taskContext : TaskContext, userConf : UserConfig) extends Task(taskContext, userConf) {
+class TaskB(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
 
   val master = userConf.getValue[ActorRef](AppMasterSpec.MASTER).get
   override def onStart(startTime: StartTime): Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/appmaster/ClockServiceSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ClockServiceSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/ClockServiceSpec.scala
index a451053..01744a3 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ClockServiceSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/ClockServiceSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,24 +17,24 @@
  */
 package io.gearpump.streaming.appmaster
 
+import scala.concurrent.{Future, Promise}
+
 import akka.actor.{ActorSystem, Props}
 import akka.testkit.{ImplicitSender, TestKit, TestProbe}
-import io.gearpump.streaming.task.{GetStartClock, UpstreamMinClock, GetLatestMinClock}
-import io.gearpump.cluster.{UserConfig, TestUtil}
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+
+import io.gearpump.cluster.{TestUtil, UserConfig}
 import io.gearpump.partitioner.{HashPartitioner, Partitioner, PartitionerDescription}
 import io.gearpump.streaming.appmaster.ClockService.{ChangeToNewDAG, ChangeToNewDAGSuccess, HealthChecker, ProcessorClock}
 import io.gearpump.streaming.appmaster.ClockServiceSpec.Store
 import io.gearpump.streaming.storage.AppDataStore
-import io.gearpump.streaming.task._
-import io.gearpump.streaming.{LifeTime, DAG, ProcessorDescription}
+import io.gearpump.streaming.task.{GetLatestMinClock, GetStartClock, UpstreamMinClock, _}
+import io.gearpump.streaming.{DAG, LifeTime, ProcessorDescription}
 import io.gearpump.util.Graph
 import io.gearpump.util.Graph._
-import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
-
-import scala.concurrent.{Future, Promise}
 
 class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
-  with WordSpecLike with Matchers with BeforeAndAfterAll{
+  with WordSpecLike with Matchers with BeforeAndAfterAll {
 
   def this() = this(ActorSystem("ClockServiceSpec", TestUtil.DEFAULT_CONFIG))
 
@@ -50,46 +50,48 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli
   "The ClockService" should {
     "maintain a global view of message timestamp in the application" in {
       val store = new Store()
-      val startClock  = 100L
+      val startClock = 100L
       store.put(ClockService.START_CLOCK, startClock)
       val clockService = system.actorOf(Props(new ClockService(dag, store)))
       clockService ! GetLatestMinClock
       expectMsg(LatestMinClock(startClock))
 
-      //task(0,0): clock(101); task(1,0): clock(100)
+      // task(0,0): clock(101); task(1,0): clock(100)
       clockService ! UpdateClock(TaskId(0, 0), 101)
 
-      // there is no upstream, so pick Long.MaxValue
+      // There is no upstream, so pick Long.MaxValue
       expectMsg(UpstreamMinClock(Long.MaxValue))
 
-      // min clock is updated
+      // Min clock is updated
       clockService ! GetLatestMinClock
       expectMsg(LatestMinClock(100))
 
-
-      //task(0,0): clock(101); task(1,0): clock(101)
+      // task(0,0): clock(101); task(1,0): clock(101)
       clockService ! UpdateClock(TaskId(1, 0), 101)
 
-      //upstream is Task(0, 0), 101
+      // Upstream is Task(0, 0), 101
       expectMsg(UpstreamMinClock(101))
 
-      // min clock is updated
+      // Min clock is updated
       clockService ! GetLatestMinClock
       expectMsg(LatestMinClock(101))
     }
 
     "act on ChangeToNewDAG and make sure downstream clock smaller than upstreams" in {
       val store = new Store()
-      val startClock  = 100L
+      val startClock = 100L
       store.put(ClockService.START_CLOCK, startClock)
       val clockService = system.actorOf(Props(new ClockService(dag, store)))
       val task = TestProbe()
       clockService.tell(UpdateClock(TaskId(0, 0), 200), task.ref)
       task.expectMsgType[UpstreamMinClock]
 
-      val task3 = ProcessorDescription(id = 3, taskClass = classOf[TaskActor].getName, parallelism = 1)
-      val task4 = ProcessorDescription(id = 4, taskClass = classOf[TaskActor].getName, parallelism = 1)
-      val task5 = ProcessorDescription(id = 5, taskClass = classOf[TaskActor].getName, parallelism = 1)
+      val task3 = ProcessorDescription(id = 3, taskClass = classOf[TaskActor].getName,
+        parallelism = 1)
+      val task4 = ProcessorDescription(id = 4, taskClass = classOf[TaskActor].getName,
+        parallelism = 1)
+      val task5 = ProcessorDescription(id = 5, taskClass = classOf[TaskActor].getName,
+        parallelism = 1)
       val dagAddMiddleNode = DAG(Graph(
         task1 ~ hash ~> task2,
         task1 ~ hash ~> task3,
@@ -100,12 +102,12 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli
       val user = TestProbe()
       clockService.tell(ChangeToNewDAG(dagAddMiddleNode), user.ref)
 
-      val clocks = user.expectMsgPF(){
+      val clocks = user.expectMsgPF() {
         case ChangeToNewDAGSuccess(clocks) =>
           clocks
       }
 
-      // for intermediate task, pick its upstream as initial clock
+      // For intermediate task, pick its upstream as initial clock
       assert(clocks(task3.id) == clocks(task1.id))
 
       // For sink task, pick its upstream as initial clock
@@ -117,7 +119,7 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli
 
     "maintain global checkpoint time" in {
       val store = new Store()
-      val startClock  = 100L
+      val startClock = 100L
       store.put(ClockService.START_CLOCK, startClock)
       val clockService = system.actorOf(Props(new ClockService(dag, store)))
       clockService ! UpdateClock(TaskId(0, 0), 200L)
@@ -129,8 +131,10 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli
       expectMsg(StartClock(200L))
 
       val conf = UserConfig.empty.withBoolean("state.checkpoint.enable", true)
-      val task3 = ProcessorDescription(id = 3, taskClass = classOf[TaskActor].getName, parallelism = 1, taskConf = conf)
-      val task4 = ProcessorDescription(id = 4, taskClass = classOf[TaskActor].getName, parallelism = 1, taskConf = conf)
+      val task3 = ProcessorDescription(id = 3, taskClass = classOf[TaskActor].getName,
+        parallelism = 1, taskConf = conf)
+      val task4 = ProcessorDescription(id = 4, taskClass = classOf[TaskActor].getName,
+        parallelism = 1, taskConf = conf)
       val dagWithStateTasks = DAG(Graph(
         task1 ~ hash ~> task2,
         task1 ~ hash ~> task3,
@@ -184,14 +188,14 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli
       val sourceClock = new ProcessorClock(0, LifeTime.Immortal, 1)
       sourceClock.init(0L)
       val sink = ProcessorDescription(id = 1, taskClass = null, parallelism = 1)
-      val sinkClock = new ProcessorClock(1,LifeTime.Immortal, 1)
+      val sinkClock = new ProcessorClock(1, LifeTime.Immortal, 1)
       sinkClock.init(0L)
       val graph = Graph.empty[ProcessorDescription, PartitionerDescription]
       graph.addVertex(source)
       graph.addVertex(sink)
       graph.addEdge(source, PartitionerDescription(null), sink)
       val dag = DAG(graph)
-      val clocks = Map (
+      val clocks = Map(
         0 -> sourceClock,
         1 -> sinkClock
       )
@@ -199,30 +203,29 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli
       sourceClock.updateMinClock(0, 100L)
       sinkClock.updateMinClock(0, 100L)
 
-      // clock advance from 0 to 100, there is no stalling.
+      // Clock advances from 0 to 100, there is no stalling.
       healthChecker.check(currentMinClock = 100, clocks, dag, 200)
       healthChecker.getReport.stallingTasks shouldBe List.empty[TaskId]
 
-      // clock not advancing.
-      // pasted time exceed the stalling threshold, report stalling
+      // Clock not advancing.
+      // Pasted time exceed the stalling threshold, report stalling
       healthChecker.check(currentMinClock = 100, clocks, dag, 1300)
 
-      // the source task is stalling the clock
+      // The source task is stalling the clock
       healthChecker.getReport.stallingTasks shouldBe List(TaskId(0, 0))
 
-      // advance the source clock
+      // Advance the source clock
       sourceClock.updateMinClock(0, 101L)
       healthChecker.check(currentMinClock = 100, clocks, dag, 1300)
-      // the sink task is stalling the clock
+      // The sink task is stalling the clock
       healthChecker.getReport.stallingTasks shouldBe List(TaskId(1, 0))
     }
   }
-
 }
 
 object ClockServiceSpec {
 
-  class Store extends AppDataStore{
+  class Store extends AppDataStore {
 
     private var map = Map.empty[String, Any]
 
@@ -231,7 +234,7 @@ object ClockServiceSpec {
       Promise.successful(value).future
     }
 
-    def get(key: String) : Future[Any] = {
+    def get(key: String): Future[Any] = {
       Promise.successful(map.get(key).getOrElse(null)).future
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala
index 2750cc5..fb633f9 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala
@@ -18,8 +18,13 @@
 
 package io.gearpump.streaming.appmaster
 
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
 import akka.actor.{ActorSystem, Props}
 import akka.testkit.TestProbe
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+
 import io.gearpump.cluster.{TestUtil, UserConfig}
 import io.gearpump.partitioner.{HashPartitioner, Partitioner}
 import io.gearpump.streaming.appmaster.DagManager.{DAGOperationFailed, DAGOperationSuccess, GetLatestDAG, GetTaskLaunchData, LatestDAG, NewDAGDeployed, ReplaceProcessor, TaskLaunchData, WatchChange}
@@ -27,7 +32,6 @@ import io.gearpump.streaming.task.{Subscriber, TaskActor}
 import io.gearpump.streaming.{DAG, LifeTime, ProcessorDescription, StreamApplication}
 import io.gearpump.util.Graph
 import io.gearpump.util.Graph._
-import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
 
 class DagManagerSpec extends WordSpecLike with Matchers with BeforeAndAfterAll {
 
@@ -94,8 +98,8 @@ class DagManagerSpec extends WordSpecLike with Matchers with BeforeAndAfterAll {
   }
 
   override def afterAll {
-    system.shutdown()
-    system.awaitTermination()
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 
   override def beforeAll {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorManagerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorManagerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorManagerSpec.scala
index 9121a22..a57a1ae 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorManagerSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorManagerSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,39 +18,43 @@
 
 package io.gearpump.streaming.appmaster
 
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
 import akka.actor._
 import akka.testkit.TestProbe
 import com.typesafe.config.ConfigFactory
-import io.gearpump.streaming.appmaster.ExecutorManager.ExecutorStarted
-import io.gearpump.{WorkerId, TestProbeUtil}
+import org.scalatest._
+
+import io.gearpump.TestProbeUtil
 import io.gearpump.cluster.AppMasterToWorker.ChangeExecutorResource
 import io.gearpump.cluster._
-import io.gearpump.cluster.appmaster.{ExecutorSystem, WorkerInfo}
 import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemStarted, StartExecutorSystemTimeout, StartExecutorSystems}
+import io.gearpump.cluster.appmaster.{ExecutorSystem, WorkerInfo}
 import io.gearpump.cluster.scheduler.{Resource, ResourceRequest}
+import io.gearpump.cluster.worker.WorkerId
 import io.gearpump.jarstore.FilePath
 import io.gearpump.streaming.ExecutorId
 import io.gearpump.streaming.ExecutorToAppMaster.RegisterExecutor
-import io.gearpump.streaming.appmaster.ExecutorManager._
+import io.gearpump.streaming.appmaster.ExecutorManager.{ExecutorStarted, _}
 import io.gearpump.streaming.appmaster.ExecutorManagerSpec.StartExecutorActorPlease
 import io.gearpump.util.ActorSystemBooter.BindLifeCycle
 import io.gearpump.util.LogUtil
-import org.scalatest._
 
-class ExecutorManagerSpec  extends FlatSpec with Matchers with BeforeAndAfterAll {
+class ExecutorManagerSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
   implicit var system: ActorSystem = null
 
   private val LOG = LogUtil.getLogger(getClass)
   private val appId = 0
   private val resource = Resource(10)
 
-  override def beforeAll = {
+  override def beforeAll(): Unit = {
     system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
   }
 
-  override def afterAll = {
-    system.shutdown()
-    system.awaitTermination()
+  override def afterAll(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 
   private def startExecutorSystems = {
@@ -69,17 +73,18 @@ class ExecutorManagerSpec  extends FlatSpec with Matchers with BeforeAndAfterAll
       executor.ref ! StartExecutorActorPlease
       TestProbeUtil.toProps(executor)
     }
-    val executorManager = system.actorOf(Props(new ExecutorManager(userConfig, appMasterContext, executorFactory, ConfigFactory.empty, appName)))
+    val executorManager = system.actorOf(Props(new ExecutorManager(userConfig, appMasterContext,
+      executorFactory, ConfigFactory.empty, appName)))
 
     taskManager.send(executorManager, SetTaskManager(taskManager.ref))
     val resourceRequest = Array(ResourceRequest(resource, WorkerId.unspecified))
 
-    //start executors
+    // Starts executors
     taskManager.send(executorManager, StartExecutors(resourceRequest, appJar.get))
 
-    //ask master to start executor systems
+    // Asks master to start executor systems
     import scala.concurrent.duration._
-    val startExecutorSystem = master.receiveOne(5 seconds).asInstanceOf[StartExecutorSystems]
+    val startExecutorSystem = master.receiveOne(5.seconds).asInstanceOf[StartExecutorSystems]
     assert(startExecutorSystem.resources == resourceRequest)
     import startExecutorSystem.executorSystemConfig.{classPath, executorAkkaConfig, jar, jvmArguments, username => returnedUserName}
     assert(startExecutorSystem.resources == resourceRequest)
@@ -94,7 +99,7 @@ class ExecutorManagerSpec  extends FlatSpec with Matchers with BeforeAndAfterAll
   }
 
   it should "report timeout to taskManager" in {
-    import ExecutorManager._
+    import io.gearpump.streaming.appmaster.ExecutorManager._
     val (master, executor, taskManager, executorManager) = startExecutorSystems
     master.reply(StartExecutorSystemTimeout)
     taskManager.expectMsg(StartExecutorsTimeOut)
@@ -110,30 +115,31 @@ class ExecutorManagerSpec  extends FlatSpec with Matchers with BeforeAndAfterAll
       resource, workerInfo)
     master.reply(ExecutorSystemStarted(executorSystem, None))
     import scala.concurrent.duration._
-    val bindLifeWith = executorSystemDaemon.receiveOne(3 seconds).asInstanceOf[BindLifeCycle]
+    val bindLifeWith = executorSystemDaemon.receiveOne(3.seconds).asInstanceOf[BindLifeCycle]
     val proxyExecutor = bindLifeWith.actor
     executor.expectMsg(StartExecutorActorPlease)
 
     val executorId = 0
 
-    //register executor
-    executor.send(executorManager, RegisterExecutor(proxyExecutor, executorId, resource, workerInfo))
+    // Registers executor
+    executor.send(executorManager, RegisterExecutor(proxyExecutor, executorId,
+      resource, workerInfo))
     taskManager.expectMsgType[ExecutorStarted]
 
-    //broad message to childs
+    // Broadcasts message to childs
     taskManager.send(executorManager, BroadCast("broadcast"))
     executor.expectMsg("broadcast")
 
-    //unicast
+    // Unicast
     taskManager.send(executorManager, UniCast(executorId, "unicast"))
     executor.expectMsg("unicast")
 
-    //update executor resource status
+    // Updates executor resource status
     val usedResource = Resource(5)
     executorManager ! ExecutorResourceUsageSummary(Map(executorId -> usedResource))
     worker.expectMsg(ChangeExecutorResource(appId, executorId, resource - usedResource))
 
-    //watch for executor termination
+    // Watches for executor termination
     system.stop(executor.ref)
     LOG.info("Shutting down executor, and wait taskManager to get notified")
     taskManager.expectMsg(ExecutorStopped(executorId))

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala
index 35659d6..9d4432a 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala
@@ -15,12 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package io.gearpump.streaming.appmaster
 
+import scala.concurrent.duration._
+
+import org.scalatest.{Matchers, WordSpec}
+
 import io.gearpump.streaming.executor.ExecutorRestartPolicy
 import io.gearpump.streaming.task.TaskId
-import org.scalatest.{Matchers, WordSpec}
-import scala.concurrent.duration._
 
 class ExecutorRestartPolicySpec extends WordSpec with Matchers {
 
@@ -29,7 +32,8 @@ class ExecutorRestartPolicySpec extends WordSpec with Matchers {
       val executorId1 = 1
       val executorId2 = 2
       val taskId = TaskId(0, 0)
-      val executorSupervisor = new ExecutorRestartPolicy(maxNrOfRetries = 3, withinTimeRange = 1 seconds)
+      val executorSupervisor = new ExecutorRestartPolicy(
+        maxNrOfRetries = 3, withinTimeRange = 1.seconds)
       executorSupervisor.addTaskToExecutor(executorId1, taskId)
       assert(executorSupervisor.allowRestartExecutor(executorId1))
       assert(executorSupervisor.allowRestartExecutor(executorId1))

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/appmaster/HistoryMetricsServiceSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/HistoryMetricsServiceSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/HistoryMetricsServiceSpec.scala
index d023be8..6cd70d9 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/HistoryMetricsServiceSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/HistoryMetricsServiceSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,17 +18,20 @@
 
 package io.gearpump.streaming.appmaster
 
-import akka.actor.{Props, ActorSystem}
+import scala.concurrent.Await
+
+import akka.actor.{ActorSystem, Props}
 import akka.testkit.TestProbe
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+
 import io.gearpump.cluster.ClientToMaster.QueryHistoryMetrics
-import io.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem}
+import io.gearpump.cluster.MasterToClient.HistoryMetrics
 import io.gearpump.cluster.TestUtil
-import io.gearpump.metrics.Metrics.{Histogram, Meter, Counter}
+import io.gearpump.metrics.Metrics.{Counter, Histogram, Meter}
 import io.gearpump.util.HistoryMetricsService
-import HistoryMetricsService._
-import org.scalatest.{BeforeAndAfterEach, Matchers, FlatSpec}
+import io.gearpump.util.HistoryMetricsService._
 
-class HistoryMetricsServiceSpec  extends FlatSpec with Matchers with BeforeAndAfterEach {
+class HistoryMetricsServiceSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
 
   val count = 2
   val intervalMs = 10
@@ -44,30 +47,30 @@ class HistoryMetricsServiceSpec  extends FlatSpec with Matchers with BeforeAndAf
     val store = new SingleValueMetricsStore(count, intervalMs)
 
     var now = 0L
-    //only 1 data point will be kept in @intervalMs
+    // Only 1 data point will be kept in @intervalMs
     store.add(Counter("count", 1), now)
     store.add(Counter("count", 2), now)
 
     now = now + intervalMs + 1
 
-    //only 1 data point will be kept in @intervalMs
+    // Only 1 data point will be kept in @intervalMs
     store.add(Counter("count", 3), now)
     store.add(Counter("count", 4), now)
 
     now = now + intervalMs + 1
 
-    //only 1 data point will be kept in @intervalMs
-    //expire oldest data point, because we only keep @count records
+    // Only 1 data point will be kept in @intervalMs
+    // expire oldest data point, because we only keep @count records
     store.add(Counter("count", 5), now)
     store.add(Counter("count", 6), now)
 
     val result = store.read
     assert(result.size == count)
 
-    //the oldest value is expired
+    // The oldest value is expired
     assert(result.head.value.asInstanceOf[Counter].value == 3L)
 
-    //the newest value is inserted
+    // The newest value is inserted
     assert(result.last.value.asInstanceOf[Counter].value == 5L)
   }
 
@@ -119,7 +122,8 @@ class HistoryMetricsServiceSpec  extends FlatSpec with Matchers with BeforeAndAf
     assert(store.readHistory.map(_.value) == List(a))
   }
 
-  "HistoryMetricsService" should "retain lastest metrics data and allow user to query metrics by path" in {
+  "HistoryMetricsService" should
+    "retain lastest metrics data and allow user to query metrics by path" in {
     implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
     val appId = 0
     val service = system.actorOf(Props(new HistoryMetricsService("app0", config)))
@@ -129,10 +133,10 @@ class HistoryMetricsServiceSpec  extends FlatSpec with Matchers with BeforeAndAf
 
     val client = TestProbe()
 
-    // filter metrics with path "metric.counter"
+    // Filters metrics with path "metric.counter"
     client.send(service, QueryHistoryMetrics("metric.counter"))
     import scala.concurrent.duration._
-    client.expectMsgPF(3 seconds) {
+    client.expectMsgPF(3.seconds) {
       case history: HistoryMetrics =>
         assert(history.path == "metric.counter")
         val metricList = history.metrics
@@ -141,9 +145,9 @@ class HistoryMetricsServiceSpec  extends FlatSpec with Matchers with BeforeAndAf
         )
     }
 
-    // filter metrics with path "metric.meter"
+    // Filters metrics with path "metric.meter"
     client.send(service, QueryHistoryMetrics("metric.meter"))
-    client.expectMsgPF(3 seconds) {
+    client.expectMsgPF(3.seconds) {
       case history: HistoryMetrics =>
         assert(history.path == "metric.meter")
         val metricList = history.metrics
@@ -152,9 +156,9 @@ class HistoryMetricsServiceSpec  extends FlatSpec with Matchers with BeforeAndAf
         )
     }
 
-    // filter metrics with path "metric.histogram"
+    // Filters metrics with path "metric.histogram"
     client.send(service, QueryHistoryMetrics("metric.histogram"))
-    client.expectMsgPF(3 seconds) {
+    client.expectMsgPF(3.seconds) {
       case history: HistoryMetrics =>
         assert(history.path == "metric.histogram")
         val metricList = history.metrics
@@ -163,10 +167,10 @@ class HistoryMetricsServiceSpec  extends FlatSpec with Matchers with BeforeAndAf
         )
     }
 
-    // filter metrics with path prefix "metric", all metrics which can
+    // Filters metrics with path prefix "metric", all metrics which can
     // match the path prefix will be retained.
     client.send(service, QueryHistoryMetrics("metric"))
-    client.expectMsgPF(3 seconds) {
+    client.expectMsgPF(3.seconds) {
       case history: HistoryMetrics =>
         val metricList = history.metrics
 
@@ -179,7 +183,7 @@ class HistoryMetricsServiceSpec  extends FlatSpec with Matchers with BeforeAndAf
             case v: Counter => counterFound = true
             case v: Meter => meterFound = true
             case v: Histogram => histogramFound = true
-            case _ => //skip
+            case _ => // Skip
           }
         )
 
@@ -187,8 +191,7 @@ class HistoryMetricsServiceSpec  extends FlatSpec with Matchers with BeforeAndAf
         assert(counterFound && meterFound && histogramFound)
     }
 
-    system.shutdown()
-    system.awaitTermination()
-
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala
index 12128c4..b391196 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,29 +17,31 @@
  */
 package io.gearpump.streaming.appmaster
 
+import scala.concurrent.{Await, Future}
+
 import akka.actor.ActorSystem
-import com.typesafe.config.ConfigFactory
-import io.gearpump.WorkerId
-import io.gearpump.streaming.{ProcessorDescription, DAG}
-import io.gearpump.cluster.{TestUtil, AppJar}
+import org.scalatest.{Matchers, WordSpec}
+
 import io.gearpump.cluster.scheduler.{Resource, ResourceRequest}
+import io.gearpump.cluster.worker.WorkerId
+import io.gearpump.cluster.{AppJar, TestUtil}
 import io.gearpump.jarstore.FilePath
 import io.gearpump.partitioner.{HashPartitioner, Partitioner}
-import io.gearpump.streaming.appmaster.TaskSchedulerSpec.{TestTask2, TestTask1}
+import io.gearpump.streaming.appmaster.TaskSchedulerSpec.{TestTask1, TestTask2}
 import io.gearpump.streaming.task.TaskId
-import io.gearpump.streaming._
+import io.gearpump.streaming.{DAG, ProcessorDescription, _}
 import io.gearpump.util.Graph
 import io.gearpump.util.Graph._
-import org.scalatest.{Matchers, WordSpec}
-
-import scala.concurrent.{Await, Future}
 
 class JarSchedulerSpec extends WordSpec with Matchers {
   val mockJar1 = AppJar("jar1", FilePath("path"))
   val mockJar2 = AppJar("jar2", FilePath("path"))
-  val task1 = ProcessorDescription(id = 0, taskClass = classOf[TestTask1].getName, parallelism = 1, jar = mockJar1)
-  val task2 = ProcessorDescription(id = 1, taskClass = classOf[TestTask2].getName, parallelism = 1, jar = mockJar1)
-  val task3 = ProcessorDescription(id = 2, taskClass = classOf[TestTask2].getName, parallelism = 2, jar = mockJar2)
+  val task1 = ProcessorDescription(id = 0, taskClass = classOf[TestTask1].getName, parallelism = 1,
+    jar = mockJar1)
+  val task2 = ProcessorDescription(id = 1, taskClass = classOf[TestTask2].getName, parallelism = 1,
+    jar = mockJar1)
+  val task3 = ProcessorDescription(id = 2, taskClass = classOf[TestTask2].getName, parallelism = 2,
+    jar = mockJar2)
   val dag = DAG(Graph(task1 ~ Partitioner[HashPartitioner] ~> task2))
 
   import scala.concurrent.duration._
@@ -49,37 +51,46 @@ class JarSchedulerSpec extends WordSpec with Matchers {
       val system = ActorSystem("JarSchedulerSpec")
       implicit val dispatcher = system.dispatcher
       val manager = new JarScheduler(0, "APP", TestUtil.DEFAULT_CONFIG, system)
-      manager.setDag(dag, Future{0L})
+      manager.setDag(dag, Future {
+        0L
+      })
       val requests = Array(ResourceRequest(Resource(2), WorkerId.unspecified))
-      val result = Await.result(manager.getRequestDetails(), 15 seconds)
+      val result = Await.result(manager.getResourceRequestDetails(), 15.seconds)
       assert(result.length == 1)
       assert(result.head.jar == mockJar1)
       assert(result.head.requests.deep == requests.deep)
 
-      val tasks = Await.result(manager.scheduleTask(mockJar1, WorkerId(0, 0L), 0, Resource(2)), 15 seconds)
+      val tasks = Await.result(manager.scheduleTask(mockJar1, WorkerId(0, 0L), 0,
+        Resource(2)), 15.seconds)
       assert(tasks.contains(TaskId(0, 0)))
       assert(tasks.contains(TaskId(1, 0)))
 
       val newDag = replaceDAG(dag, 1, task3, 1)
 
-      manager.setDag(newDag, Future{0})
-      val requestDetails = Await.result(manager.getRequestDetails().map(_.sortBy(_.jar.name)), 15 seconds)
+      manager.setDag(newDag, Future {
+        0
+      })
+      val requestDetails = Await.result(manager.getResourceRequestDetails().
+        map(_.sortBy(_.jar.name)), 15.seconds)
       assert(requestDetails.length == 2)
       assert(requestDetails.last.jar == mockJar2)
       assert(requestDetails.last.requests.deep == requests.deep)
 
-      system.shutdown()
-      system.awaitTermination()
+      system.terminate()
+      Await.result(system.whenTerminated, Duration.Inf)
     }
   }
 
-  def replaceDAG(dag: DAG, oldProcessorId: ProcessorId, newProcessor: ProcessorDescription, newVersion: Int): DAG = {
-    val oldProcessorLife = LifeTime(dag.processors(oldProcessorId).life.birth, newProcessor.life.birth)
+  def replaceDAG(
+      dag: DAG, oldProcessorId: ProcessorId, newProcessor: ProcessorDescription, newVersion: Int)
+    : DAG = {
+    val oldProcessorLife = LifeTime(dag.processors(oldProcessorId).life.birth,
+      newProcessor.life.birth)
     val newProcessorMap = dag.processors ++
-        Map(oldProcessorId -> dag.processors(oldProcessorId).copy(life = oldProcessorLife),
-          newProcessor.id -> newProcessor)
+      Map(oldProcessorId -> dag.processors(oldProcessorId).copy(life = oldProcessorLife),
+        newProcessor.id -> newProcessor)
     val newGraph = dag.graph.subGraph(oldProcessorId).
-        replaceVertex(oldProcessorId, newProcessor.id).addGraph(dag.graph)
+      replaceVertex(oldProcessorId, newProcessor.id).addGraph(dag.graph)
     new DAG(newVersion, newProcessorMap, newGraph)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskLocatorSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskLocatorSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskLocatorSpec.scala
index c55be84..2e07def 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskLocatorSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskLocatorSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,14 +18,15 @@
 
 package io.gearpump.streaming.appmaster
 
-import io.gearpump.WorkerId
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import io.gearpump.cluster.worker.WorkerId
 import io.gearpump.streaming.appmaster.TaskLocator.Localities
 import io.gearpump.streaming.task.TaskId
-import org.scalatest.{BeforeAndAfterAll, Matchers, FlatSpec}
 
 class TaskLocatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
   it should "serialize/deserialize correctly" in {
-    val localities = new Localities(Map(WorkerId(0, 0L) -> Array(TaskId(0, 1), TaskId(1,2))))
+    val localities = new Localities(Map(WorkerId(0, 0L) -> Array(TaskId(0, 1), TaskId(1, 2))))
     Localities.toJson(localities)
 
     localities.localities.mapValues(_.toList) shouldBe

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala
index 8105df3..8153fce 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,10 +18,17 @@
 
 package io.gearpump.streaming.appmaster
 
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
+
 import akka.actor.{ActorRef, ActorSystem, Props}
 import akka.testkit.TestProbe
+import org.mockito.Mockito._
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+
 import io.gearpump.cluster.MasterToAppMaster.ReplayFromTimestampWindowTrailingEdge
 import io.gearpump.cluster.scheduler.{Resource, ResourceRequest}
+import io.gearpump.cluster.worker.WorkerId
 import io.gearpump.cluster.{AppJar, TestUtil, UserConfig}
 import io.gearpump.jarstore.FilePath
 import io.gearpump.partitioner.{HashPartitioner, Partitioner, PartitionerDescription}
@@ -39,11 +46,7 @@ import io.gearpump.streaming.{DAG, LifeTime, ProcessorDescription, ProcessorId}
 import io.gearpump.transport.HostPort
 import io.gearpump.util.Graph
 import io.gearpump.util.Graph._
-import io.gearpump.{WorkerId, Message, TimeStamp}
-import org.mockito.Mockito._
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
-
-import scala.concurrent.Future
+import io.gearpump.{Message, TimeStamp}
 
 class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
 
@@ -73,8 +76,8 @@ class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
   }
 
   override def afterEach(): Unit = {
-    system.shutdown()
-    system.awaitTermination()
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 
   it should "recover by requesting new executors when executor stopped unexpectedly" in {
@@ -83,15 +86,18 @@ class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
     implicit val dispatcher = system.dispatcher
 
     val resourceRequest = Array(ResourceRequest(resource, workerId))
-    when(scheduler.executorFailed(executorId)).thenReturn(Future{Some(ResourceRequestDetail(mockJar, resourceRequest))})
+    when(scheduler.executorFailed(executorId)).thenReturn(Future {
+      Some(ResourceRequestDetail(mockJar,
+        resourceRequest))
+    })
 
     taskManager ! ExecutorStopped(executorId)
 
-    // when one executor stop, it will also trigger the recovery by restart
+    // When one executor stop, it will also trigger the recovery by restart
     // existing executors
     executorManager.expectMsg(BroadCast(RestartTasks(dagVersion)))
 
-    // ask for new executors
+    // Asks for new executors
     val returned = executorManager.receiveN(1).head.asInstanceOf[StartExecutors]
     assert(returned.resources.deep == resourceRequest.deep)
     executorManager.reply(StartExecutorsTimeOut)
@@ -110,7 +116,7 @@ class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
     executorManager.expectMsg(BroadCast(RestartTasks(dagVersion)))
   }
 
-  import TaskManager.TaskChangeRegistry
+  import io.gearpump.streaming.appmaster.TaskManager.TaskChangeRegistry
   "TaskChangeRegistry" should "track all modified task registration" in {
     val tasks = List(TaskId(0, 0), TaskId(0, 1))
     val registry = new TaskChangeRegistry(tasks)
@@ -155,24 +161,28 @@ class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
     val dagManager = TestProbe()
 
     val taskManager = system.actorOf(
-      Props(new TaskManager(appId, dagManager.ref, scheduler, executorManager.ref, clockService.ref, appMaster.ref, "appName")))
+      Props(new TaskManager(appId, dagManager.ref, scheduler, executorManager.ref, clockService.ref,
+        appMaster.ref, "appName")))
 
     dagManager.expectMsgType[WatchChange]
     executorManager.expectMsgType[SetTaskManager]
 
-    // step1: first transition from Unitialized to ApplicationReady
+    // Step1: first transition from Unitialized to ApplicationReady
     executorManager.expectMsgType[ExecutorResourceUsageSummary]
     dagManager.expectMsgType[NewDAGDeployed]
 
-    // step2: Get Additional Resource Request
-    when(scheduler.getRequestDetails())
-        .thenReturn(Future{Array(ResourceRequestDetail(mockJar, Array(ResourceRequest(resource, WorkerId.unspecified))))})
+    // Step2: Get Additional Resource Request
+    when(scheduler.getResourceRequestDetails())
+      .thenReturn(Future {
+        Array(ResourceRequestDetail(mockJar, Array(ResourceRequest(resource,
+          WorkerId.unspecified))))
+      })
 
-    // step3: DAG changed. Start transit from ApplicationReady -> DynamicDAG
+    // Step3: DAG changed. Start transit from ApplicationReady -> DynamicDAG
     dagManager.expectMsg(GetLatestDAG)
     dagManager.reply(LatestDAG(dag))
 
-    // step4: Start remote Executors.
+    // Step4: Start remote Executors.
     // received Broadcast
     executorManager.expectMsg(BroadCast(StartDynamicDag(dag.version)))
     executorManager.expectMsgType[StartExecutors]
@@ -180,10 +190,10 @@ class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
     when(scheduler.scheduleTask(mockJar, workerId, executorId, resource))
       .thenReturn(Future(List(TaskId(0, 0), TaskId(1, 0))))
 
-    // step5: Executor is started.
+    // Step5: Executor is started.
     executorManager.reply(ExecutorStarted(executorId, resource, workerId, Some(mockJar)))
 
-    // step6: Prepare to start Task. First GetTaskLaunchData.
+    // Step6: Prepare to start Task. First GetTaskLaunchData.
     val taskLaunchData: PartialFunction[Any, TaskLaunchData] = {
       case GetTaskLaunchData(_, 0, executorStarted) =>
         task1LaunchData.copy(context = executorStarted)
@@ -197,18 +207,17 @@ class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
     val launchData2 = dagManager.expectMsgPF()(taskLaunchData)
     dagManager.reply(launchData2)
 
-    // step7: Launch Task
+    // Step7: Launch Task
     val launchTaskMatch: PartialFunction[Any, RegisterTask] = {
       case UniCast(executorId, launch: LaunchTasks) =>
-        Console.println("Launch Task " + launch.processorDescription.id)
         RegisterTask(launch.taskId.head, executorId, HostPort("127.0.0.1:3000"))
     }
 
-    // taskmanager should return the latest start clock to task(0,0)
+    // Taskmanager should return the latest start clock to task(0,0)
     clockService.expectMsg(GetStartClock)
     clockService.reply(StartClock(0))
 
-    // step8: Task is started. registerTask.
+    // Step8: Task is started. registerTask.
     val registerTask1 = executorManager.expectMsgPF()(launchTaskMatch)
     taskManager.tell(registerTask1, executor.ref)
     executor.expectMsgType[TaskRegistered]
@@ -217,53 +226,51 @@ class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
     taskManager.tell(registerTask2, executor.ref)
     executor.expectMsgType[TaskRegistered]
 
-    // step9: start broadcasting TaskLocations.
+    // Step9: start broadcasting TaskLocations.
     import scala.concurrent.duration._
-    assert(executorManager.expectMsgPF(5 seconds) {
+    assert(executorManager.expectMsgPF(5.seconds) {
       case BroadCast(startAllTasks) => startAllTasks.isInstanceOf[TaskLocationsReady]
     })
 
-    //step10: Executor confirm it has received TaskLocationsReceived(version, executorId)
+    // Step10: Executor confirm it has received TaskLocationsReceived(version, executorId)
     taskManager.tell(TaskLocationsReceived(dag.version, executorId), executor.ref)
 
-
-    // step11: Tell ClockService to update DAG.
+    // Step11: Tell ClockService to update DAG.
     clockService.expectMsgType[ChangeToNewDAG]
     clockService.reply(ChangeToNewDAGSuccess(Map.empty[ProcessorId, TimeStamp]))
 
-
-    //step12: start all tasks
+    // Step12: start all tasks
     import scala.concurrent.duration._
-    assert(executorManager.expectMsgPF(5 seconds) {
+    assert(executorManager.expectMsgPF(5.seconds) {
       case BroadCast(startAllTasks) => startAllTasks.isInstanceOf[StartAllTasks]
     })
 
-    // step13, Tell executor Manager the updated usage status of executors.
+    // Step13, Tell executor Manager the updated usage status of executors.
     executorManager.expectMsgType[ExecutorResourceUsageSummary]
 
-    // step14: transition from DynamicDAG to ApplicationReady
+    // Step14: transition from DynamicDAG to ApplicationReady
     Env(executorManager, clockService, appMaster, executor, taskManager, scheduler)
   }
 }
 
 object TaskManagerSpec {
   case class Env(
-    executorManager: TestProbe,
-    clockService: TestProbe,
-    appMaster: TestProbe,
-    executor: TestProbe,
-    taskManager: ActorRef,
-    scheduler: JarScheduler)
-
-  class Task1(taskContext : TaskContext, userConf : UserConfig)
+      executorManager: TestProbe,
+      clockService: TestProbe,
+      appMaster: TestProbe,
+      executor: TestProbe,
+      taskManager: ActorRef,
+      scheduler: JarScheduler)
+
+  class Task1(taskContext: TaskContext, userConf: UserConfig)
     extends Task(taskContext, userConf) {
-    override def onStart(startTime: StartTime): Unit = ???
-    override def onNext(msg: Message): Unit = ???
+    override def onStart(startTime: StartTime): Unit = {}
+    override def onNext(msg: Message): Unit = {}
   }
 
-  class Task2 (taskContext : TaskContext, userConf : UserConfig)
+  class Task2(taskContext: TaskContext, userConf: UserConfig)
     extends Task(taskContext, userConf) {
-    override def onStart(startTime: StartTime): Unit = ???
-    override def onNext(msg: Message): Unit = ???
+    override def onStart(startTime: StartTime): Unit = {}
+    override def onNext(msg: Message): Unit = {}
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskRegistrySpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskRegistrySpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskRegistrySpec.scala
index ecac824..e8417ea 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskRegistrySpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskRegistrySpec.scala
@@ -18,12 +18,12 @@
 
 package io.gearpump.streaming.appmaster
 
-import io.gearpump.streaming.appmaster.TaskRegistry.{Reject, Accept}
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+
 import io.gearpump.cluster.scheduler.Resource
 import io.gearpump.streaming.appmaster.TaskRegistry.{Accept, Reject, TaskLocation, TaskLocations}
 import io.gearpump.streaming.task.TaskId
 import io.gearpump.transport.HostPort
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
 class TaskRegistrySpec extends FlatSpec with Matchers with BeforeAndAfterEach {
 
   it should "maintain registered tasks" in {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
index d2373ea..2c64133 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,23 +17,22 @@
  */
 package io.gearpump.streaming.appmaster
 
+import scala.collection.mutable.ArrayBuffer
+
 import com.typesafe.config.ConfigFactory
-import io.gearpump.streaming.Constants
-import io.gearpump.streaming.appmaster.TaskLocator.Localities
-import io.gearpump.streaming.task.{StartTime, TaskContext, TaskId}
-import io.gearpump.{WorkerId, Message}
+import org.scalatest.{Matchers, WordSpec}
+
+import io.gearpump.Message
 import io.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceRequest}
-import io.gearpump.cluster.{TestUtil, ClusterConfig, UserConfig}
+import io.gearpump.cluster.worker.WorkerId
+import io.gearpump.cluster.{TestUtil, UserConfig}
 import io.gearpump.partitioner.{HashPartitioner, Partitioner}
 import io.gearpump.streaming.appmaster.TaskLocator.Localities
 import io.gearpump.streaming.appmaster.TaskSchedulerSpec.{TestTask1, TestTask2}
 import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
-import io.gearpump.streaming.{DAG, ProcessorDescription}
+import io.gearpump.streaming.{Constants, DAG, ProcessorDescription}
 import io.gearpump.util.Graph
 import io.gearpump.util.Graph._
-import org.scalatest.{Matchers, WordSpec}
-
-import scala.collection.mutable.ArrayBuffer
 
 class TaskSchedulerSpec extends WordSpec with Matchers {
   val task1 = ProcessorDescription(id = 0, taskClass = classOf[TestTask1].getName, parallelism = 4)
@@ -47,19 +46,19 @@ class TaskSchedulerSpec extends WordSpec with Matchers {
     "schedule tasks on different workers properly according user's configuration" in {
 
       val localities = Localities(
-        Map(WorkerId(1, 0L) -> Array(TaskId(0,0), TaskId(0,1), TaskId(1,0), TaskId(1,1)),
-          WorkerId(2, 0L) -> Array(TaskId(0,2), TaskId(0,3))
-      ))
+        Map(WorkerId(1, 0L) -> Array(TaskId(0, 0), TaskId(0, 1), TaskId(1, 0), TaskId(1, 1)),
+          WorkerId(2, 0L) -> Array(TaskId(0, 2), TaskId(0, 3))
+        ))
 
       val localityConfig = ConfigFactory.parseString(Localities.toJson(localities))
 
-      import Constants.GEARPUMP_STREAMING_LOCALITIES
+      import io.gearpump.streaming.Constants.GEARPUMP_STREAMING_LOCALITIES
       val appName = "app"
       val taskScheduler = new TaskSchedulerImpl(appId = 0, appName,
         config.withValue(s"$GEARPUMP_STREAMING_LOCALITIES.$appName", localityConfig.root))
 
       val expectedRequests =
-        Array( ResourceRequest(Resource(4), WorkerId(1, 0L), relaxation = Relaxation.SPECIFICWORKER),
+        Array(ResourceRequest(Resource(4), WorkerId(1, 0L), relaxation = Relaxation.SPECIFICWORKER),
           ResourceRequest(Resource(2), WorkerId(2, 0L), relaxation = Relaxation.SPECIFICWORKER))
 
       taskScheduler.setDAG(dag)
@@ -71,28 +70,32 @@ class TaskSchedulerSpec extends WordSpec with Matchers {
       val tasksOnWorker1 = ArrayBuffer[Int]()
       val tasksOnWorker2 = ArrayBuffer[Int]()
       for (i <- 0 until 4) {
-        tasksOnWorker1.append(taskScheduler.schedule(WorkerId(1, 0L), executorId = 0, Resource(1)).head.processorId)
+        tasksOnWorker1.append(taskScheduler.schedule(WorkerId(1, 0L),
+          executorId = 0, Resource(1)).head.processorId)
       }
       for (i <- 0 until 2) {
-        tasksOnWorker2.append(taskScheduler.schedule(WorkerId(2, 0L), executorId = 1, Resource(1)).head.processorId)
+        tasksOnWorker2.append(taskScheduler.schedule(WorkerId(2, 0L), executorId = 1,
+          Resource(1)).head.processorId)
       }
 
-      //allocate more resource, and no tasks to launch
-      assert(taskScheduler.schedule(WorkerId(3, 0L), executorId = 3, Resource(1)) == List.empty[TaskId])
+      // Allocates more resource, and no tasks to launch
+      assert(taskScheduler.schedule(WorkerId(3, 0L), executorId = 3,
+        Resource(1)) == List.empty[TaskId])
 
-      //on worker1, executor 0
+      // On worker1, executor 0
       assert(tasksOnWorker1.sorted.sameElements(Array(0, 0, 1, 1)))
 
-      //on worker2, executor 1, Task(0, 0), Task(0, 1)
+      // On worker2, executor 1, Task(0, 0), Task(0, 1)
       assert(tasksOnWorker2.sorted.sameElements(Array(0, 0)))
 
       val rescheduledResources = taskScheduler.executorFailed(executorId = 1)
 
-      assert(rescheduledResources.sameElements(Array(ResourceRequest(Resource(2), WorkerId.unspecified, relaxation = Relaxation.ONEWORKER))))
+      assert(rescheduledResources.sameElements(Array(ResourceRequest(Resource(2),
+        WorkerId.unspecified, relaxation = Relaxation.ONEWORKER))))
 
       val launchedTask = taskScheduler.schedule(WorkerId(3, 0L), executorId = 3, Resource(2))
 
-      //start the failed 2 tasks Task(0, 0) and Task(0, 1)
+      // Starts the failed 2 tasks Task(0, 0) and Task(0, 1)
       assert(launchedTask.length == 2)
     }
 
@@ -101,7 +104,7 @@ class TaskSchedulerSpec extends WordSpec with Matchers {
       val taskScheduler = new TaskSchedulerImpl(appId = 0, appName, config)
 
       val expectedRequests =
-        Array( ResourceRequest(Resource(4), WorkerId(1, 0L), relaxation = Relaxation.SPECIFICWORKER),
+        Array(ResourceRequest(Resource(4), WorkerId(1, 0L), relaxation = Relaxation.SPECIFICWORKER),
           ResourceRequest(Resource(2), WorkerId(2, 0L), relaxation = Relaxation.SPECIFICWORKER))
 
       taskScheduler.setDAG(dag)
@@ -112,16 +115,16 @@ class TaskSchedulerSpec extends WordSpec with Matchers {
   }
 }
 
-object TaskSchedulerSpec{
-  class TestTask1(taskContext : TaskContext, userConf : UserConfig)
-      extends Task(taskContext, userConf) {
-    override def onStart(startTime: StartTime): Unit = ???
-    override def onNext(msg: Message): Unit = ???
+object TaskSchedulerSpec {
+  class TestTask1(taskContext: TaskContext, userConf: UserConfig)
+    extends Task(taskContext, userConf) {
+    override def onStart(startTime: StartTime): Unit = Unit
+    override def onNext(msg: Message): Unit = Unit
   }
 
-  class TestTask2(taskContext : TaskContext, userConf : UserConfig)
-      extends Task(taskContext, userConf) {
-    override def onStart(startTime: StartTime): Unit = ???
-    override def onNext(msg: Message): Unit = ???
+  class TestTask2(taskContext: TaskContext, userConf: UserConfig)
+    extends Task(taskContext, userConf) {
+    override def onStart(startTime: StartTime): Unit = Unit
+    override def onNext(msg: Message): Unit = Unit
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamAppSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamAppSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamAppSpec.scala
index 89d2d64..132d46c 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamAppSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamAppSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,28 +18,30 @@
 
 package io.gearpump.streaming.dsl
 
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
 import akka.actor.ActorSystem
-import io.gearpump.streaming.dsl.plan.OpTranslator.SourceTask
-import io.gearpump.cluster.TestUtil
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.streaming.dsl.plan.OpTranslator._
 import org.mockito.Mockito.when
 import org.scalatest._
 import org.scalatest.mock.MockitoSugar
-class StreamAppSpec  extends FlatSpec with Matchers with BeforeAndAfterAll  with MockitoSugar {
+
+import io.gearpump.cluster.TestUtil
+import io.gearpump.cluster.client.ClientContext
+import io.gearpump.streaming.dsl.plan.OpTranslator.SourceTask
+class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar {
 
   implicit var system: ActorSystem = null
 
-  override def beforeAll: Unit = {
-    system = ActorSystem("test",  TestUtil.DEFAULT_CONFIG)
+  override def beforeAll(): Unit = {
+    system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
   }
 
-  override def afterAll: Unit = {
-    system.shutdown()
-    system.awaitTermination()
+  override def afterAll(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 
-
   it should "be able to generate multiple new streams" in {
     val context: ClientContext = mock[ClientContext]
     when(context.system).thenReturn(system)
@@ -57,7 +59,7 @@ class StreamAppSpec  extends FlatSpec with Matchers with BeforeAndAfterAll  with
 
     val app = StreamApp("dsl", context)
     val parallism = 3
-    app.source(List("A","B","C"), parallism, "").flatMap(Array(_)).reduce(_+_)
+    app.source(List("A", "B", "C"), parallism, "").flatMap(Array(_)).reduce(_ + _)
     val task = app.plan.dag.vertices.iterator.next()
     assert(task.taskClass == classOf[SourceTask[_, _]].getName)
     assert(task.parallelism == parallism)
@@ -72,7 +74,7 @@ class StreamAppSpec  extends FlatSpec with Matchers with BeforeAndAfterAll  with
       "1",
       "2"
     )
-    val producer = app.source(list, 1, "producer").flatMap(Array(_)).reduce(_+_)
+    val producer = app.source(list, 1, "producer").flatMap(Array(_)).reduce(_ + _)
     val task = app.plan.dag.vertices.iterator.next()
       /*
       val task = app.plan.dag.vertices.iterator.map(desc => {