You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/04/26 11:42:54 UTC

[44/49] incubator-gearpump git commit: GEARPUMP-11, fix code style

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala b/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala
index 8d4515a..6a4ac07 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,23 +19,24 @@
 package io.gearpump.cluster.main
 
 import java.util.concurrent.TimeUnit
+import scala.collection.JavaConverters._
+import scala.collection.immutable
+import scala.concurrent.Await
+import scala.concurrent.duration._
 
 import akka.actor._
 import akka.cluster.ClusterEvent._
 import akka.cluster.ddata.DistributedData
+import akka.cluster.singleton.{ClusterSingletonManager, ClusterSingletonManagerSettings, ClusterSingletonProxy, ClusterSingletonProxySettings}
 import akka.cluster.{Cluster, Member, MemberStatus}
-import akka.cluster.singleton.{ClusterSingletonManagerSettings, ClusterSingletonProxySettings, ClusterSingletonManager, ClusterSingletonProxy}
 import com.typesafe.config.ConfigValueFactory
+import org.slf4j.Logger
+
 import io.gearpump.cluster.ClusterConfig
 import io.gearpump.cluster.master.{Master => MasterActor}
 import io.gearpump.util.Constants._
 import io.gearpump.util.LogUtil.ProcessType
 import io.gearpump.util.{AkkaApp, Constants, LogUtil}
-import org.slf4j.Logger
-
-import scala.collection.JavaConverters._
-import scala.collection.immutable
-import scala.concurrent.duration._
 
 object Master extends AkkaApp with ArgumentsParser {
 
@@ -44,14 +45,14 @@ object Master extends AkkaApp with ArgumentsParser {
   override def akkaConfig: Config = ClusterConfig.master()
 
   override val options: Array[(String, CLIOption[Any])] =
-    Array("ip"->CLIOption[String]("<master ip address>",required = true),
-      "port"->CLIOption("<master port>",required = true))
+    Array("ip" -> CLIOption[String]("<master ip address>", required = true),
+      "port" -> CLIOption("<master port>", required = true))
 
   override val description = "Start Master daemon"
 
   def main(akkaConf: Config, args: Array[String]): Unit = {
 
-    this.LOG  =  {
+    this.LOG = {
       LogUtil.loadConfiguration(akkaConf, ProcessType.MASTER)
       LogUtil.getLogger(getClass)
     }
@@ -60,27 +61,29 @@ object Master extends AkkaApp with ArgumentsParser {
     master(config.getString("ip"), config.getInt("port"), akkaConf)
   }
 
-  def verifyMaster(master : String, port: Int, masters : Iterable[String])  = {
-    masters.exists{ hostPort =>
+  private def verifyMaster(master: String, port: Int, masters: Iterable[String]) = {
+    masters.exists { hostPort =>
       hostPort == s"$master:$port"
     }
   }
 
-  def master(ip:String, port : Int, akkaConf: Config): Unit = {
+  private def master(ip: String, port: Int, akkaConf: Config): Unit = {
     val masters = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala
 
     if (!verifyMaster(ip, port, masters)) {
-      LOG.error(s"The provided ip $ip and port $port doesn't conform with config at gearpump.cluster.masters: ${masters.mkString(", ")}")
+      LOG.error(s"The provided ip $ip and port $port doesn't conform with config at " +
+        s"gearpump.cluster.masters: ${masters.mkString(", ")}")
       System.exit(-1)
     }
 
     val masterList = masters.map(master => s"akka.tcp://${MASTER}@$master").toList.asJava
-    val quorum = masterList.size() /2  + 1
+    val quorum = masterList.size() / 2 + 1
     val masterConfig = akkaConf.
       withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port)).
       withValue(NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(ip)).
       withValue("akka.cluster.seed-nodes", ConfigValueFactory.fromAnyRef(masterList)).
-      withValue(s"akka.cluster.role.${MASTER}.min-nr-of-members", ConfigValueFactory.fromAnyRef(quorum))
+      withValue(s"akka.cluster.role.${MASTER}.min-nr-of-members",
+        ConfigValueFactory.fromAnyRef(quorum))
 
     LOG.info(s"Starting Master Actor system $ip:$port, master list: ${masters.mkString(";")}")
     val system = ActorSystem(MASTER, masterConfig)
@@ -88,19 +91,21 @@ object Master extends AkkaApp with ArgumentsParser {
     val replicator = DistributedData(system).replicator
     LOG.info(s"Replicator path: ${replicator.path}")
 
-    //start singleton manager
+    // Starts singleton manager
     val singletonManager = system.actorOf(ClusterSingletonManager.props(
       singletonProps = Props(classOf[MasterWatcher], MASTER),
       terminationMessage = PoisonPill,
-      settings = ClusterSingletonManagerSettings(system).withSingletonName(MASTER_WATCHER).withRole(MASTER)),
+      settings = ClusterSingletonManagerSettings(system).withSingletonName(MASTER_WATCHER)
+        .withRole(MASTER)),
       name = SINGLETON_MANAGER)
 
-    //start master proxy
+    // Start master proxy
     val masterProxy = system.actorOf(ClusterSingletonProxy.props(
       singletonManagerPath = s"/user/${SINGLETON_MANAGER}",
       // The effective singleton is s"${MASTER_WATCHER}/$MASTER" instead of s"${MASTER_WATCHER}".
-      // Master will only be created when there is a majority of machines started.
-      settings = ClusterSingletonProxySettings(system).withSingletonName(s"${MASTER_WATCHER}/$MASTER").withRole(MASTER)),
+      // Master is created when there is a majority of machines started.
+      settings = ClusterSingletonProxySettings(system)
+        .withSingletonName(s"${MASTER_WATCHER}/$MASTER").withRole(MASTER)),
       name = MASTER
     )
 
@@ -108,8 +113,8 @@ object Master extends AkkaApp with ArgumentsParser {
 
     val mainThread = Thread.currentThread()
     Runtime.getRuntime().addShutdownHook(new Thread() {
-      override def run() : Unit = {
-        if (!system.isTerminated) {
+      override def run(): Unit = {
+        if (!system.whenTerminated.isCompleted) {
           LOG.info("Triggering shutdown hook....")
 
           system.stop(masterProxy)
@@ -117,21 +122,21 @@ object Master extends AkkaApp with ArgumentsParser {
           cluster.leave(cluster.selfAddress)
           cluster.down(cluster.selfAddress)
           try {
-            system.awaitTermination(Duration(3, TimeUnit.SECONDS))
+            Await.result(system.whenTerminated, Duration(3, TimeUnit.SECONDS))
           } catch {
-            case ex : Exception => //ignore
+            case ex: Exception => // Ignore
           }
-          system.shutdown()
+          system.terminate()
           mainThread.join()
         }
       }
     })
 
-    system.awaitTermination()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 }
 
-class MasterWatcher(role: String) extends Actor  with ActorLogging {
+class MasterWatcher(role: String) extends Actor with ActorLogging {
   import context.dispatcher
 
   val cluster = Cluster(context.system)
@@ -142,13 +147,13 @@ class MasterWatcher(role: String) extends Actor  with ActorLogging {
 
   val system = context.system
 
-  // sort by age, oldest first
+  // Sorts by age, oldest first
   val ageOrdering = Ordering.fromLessThan[Member] { (a, b) => a.isOlderThan(b) }
   var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering)
 
-  def receive : Receive = null
+  def receive: Receive = null
 
-  // subscribe to MemberEvent, re-subscribe when restart
+  // Subscribes to MemberEvent, re-subscribe when restart
   override def preStart(): Unit = {
     cluster.subscribe(self, classOf[MemberEvent])
     context.become(waitForInit)
@@ -159,14 +164,15 @@ class MasterWatcher(role: String) extends Actor  with ActorLogging {
 
   def matchingRole(member: Member): Boolean = member.hasRole(role)
 
-  def waitForInit : Receive = {
+  def waitForInit: Receive = {
     case state: CurrentClusterState => {
       membersByAge = immutable.SortedSet.empty(ageOrdering) ++ state.members.filter(m =>
         m.status == MemberStatus.Up && matchingRole(m))
 
       if (membersByAge.size < quorum) {
         membersByAge.iterator.mkString(",")
-        log.info(s"We cannot get a quorum, $quorum, shutting down...${membersByAge.iterator.mkString(",")}")
+        log.info(s"We cannot get a quorum, $quorum, " +
+          s"shutting down...${membersByAge.iterator.mkString(",")}")
         context.become(waitForShutdown)
         self ! MasterWatcher.Shutdown
       } else {
@@ -176,34 +182,36 @@ class MasterWatcher(role: String) extends Actor  with ActorLogging {
     }
   }
 
-  def waitForClusterEvent : Receive = {
-    case MemberUp(m) if matchingRole(m)  => {
+  def waitForClusterEvent: Receive = {
+    case MemberUp(m) if matchingRole(m) => {
       membersByAge += m
     }
-    case mEvent: MemberEvent if (mEvent.isInstanceOf[MemberExited] || mEvent.isInstanceOf[MemberRemoved]) && matchingRole(mEvent.member) => {
+    case mEvent: MemberEvent if (mEvent.isInstanceOf[MemberExited] ||
+      mEvent.isInstanceOf[MemberRemoved]) && matchingRole(mEvent.member) => {
       log.info(s"member removed ${mEvent.member}")
       val m = mEvent.member
       membersByAge -= m
       if (membersByAge.size < quorum) {
-        log.info(s"We cannot get a quorum, $quorum, shutting down...${membersByAge.iterator.mkString(",")}")
+        log.info(s"We cannot get a quorum, $quorum, " +
+          s"shutting down...${membersByAge.iterator.mkString(",")}")
         context.become(waitForShutdown)
         self ! MasterWatcher.Shutdown
       }
     }
   }
 
-  def waitForShutdown : Receive = {
+  def waitForShutdown: Receive = {
     case MasterWatcher.Shutdown => {
       cluster.unsubscribe(self)
       cluster.leave(cluster.selfAddress)
       context.stop(self)
       system.scheduler.scheduleOnce(Duration.Zero) {
         try {
-          system.awaitTermination(Duration(3, TimeUnit.SECONDS))
+          Await.result(system.whenTerminated, Duration(3, TimeUnit.SECONDS))
         } catch {
-          case ex : Exception => //ignore
+          case ex: Exception => // Ignore
         }
-        system.shutdown()
+        system.terminate()
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/main/Replay.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Replay.scala b/daemon/src/main/scala/io/gearpump/cluster/main/Replay.scala
index 451376e..c9a6e9c 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/main/Replay.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/main/Replay.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,12 +17,12 @@
  */
 package io.gearpump.cluster.main
 
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.util.{AkkaApp, LogUtil}
 import org.slf4j.Logger
 
-import scala.util.Try
+import io.gearpump.cluster.client.ClientContext
+import io.gearpump.util.{AkkaApp, LogUtil}
 
+// Internal tool to restart an application
 object Replay extends AkkaApp with ArgumentsParser {
 
   private val LOG: Logger = LogUtil.getLogger(getClass)
@@ -31,19 +31,18 @@ object Replay extends AkkaApp with ArgumentsParser {
     "appid" -> CLIOption("<application id>", required = true),
     // For document purpose only, OPTION_CONFIG option is not used here.
     // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
-    Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, defaultValue = None))
+    Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
+      defaultValue = None))
 
   override val description = "Replay the application from current min clock(low watermark)"
 
   def main(akkaConf: Config, args: Array[String]): Unit = {
     val config = parse(args)
 
-    if (null == config) {
-      return
+    if (null != config) {
+      val client = ClientContext(akkaConf)
+      client.replayFromTimestampWindowTrailingEdge(config.getInt("appid"))
+      client.close()
     }
-
-    val client = ClientContext(akkaConf)
-    client.replayFromTimestampWindowTrailingEdge(config.getInt("appid"))
-    client.close()
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/main/Worker.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Worker.scala b/daemon/src/main/scala/io/gearpump/cluster/main/Worker.scala
index 66fe25b..4818262 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/main/Worker.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/main/Worker.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,34 +18,38 @@
 
 package io.gearpump.cluster.main
 
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
 import akka.actor.{ActorSystem, Props}
+import org.slf4j.Logger
+
 import io.gearpump.cluster.ClusterConfig
 import io.gearpump.cluster.master.MasterProxy
 import io.gearpump.cluster.worker.{Worker => WorkerActor}
 import io.gearpump.transport.HostPort
 import io.gearpump.util.Constants._
-import io.gearpump.util.{AkkaApp, LogUtil}
 import io.gearpump.util.LogUtil.ProcessType
-import org.slf4j.Logger
-
-import scala.collection.JavaConverters._
-import scala.util.Try
+import io.gearpump.util.{AkkaApp, LogUtil}
 
+/** Tool to start a worker daemon process */
 object Worker extends AkkaApp with ArgumentsParser {
-  override def akkaConfig = ClusterConfig.worker()
+  protected override def akkaConfig = ClusterConfig.worker()
 
   override val description = "Start a worker daemon"
 
-  var LOG : Logger = LogUtil.getLogger(getClass)
+  var LOG: Logger = LogUtil.getLogger(getClass)
 
-  def uuid = java.util.UUID.randomUUID.toString
+  private def uuid = java.util.UUID.randomUUID.toString
 
   def main(akkaConf: Config, args: Array[String]): Unit = {
     val id = uuid
 
     this.LOG = {
       LogUtil.loadConfiguration(akkaConf, ProcessType.WORKER)
-      //delay creation of LOG instance to avoid creating an empty log file as we reset the log file name here
+      // Delay creation of LOG instance to avoid creating an empty log file as we
+      // reset the log file name here
       LogUtil.getLogger(getClass)
     }
 
@@ -62,6 +66,6 @@ object Worker extends AkkaApp with ArgumentsParser {
     system.actorOf(Props(classOf[WorkerActor], masterProxy),
       classOf[WorkerActor].getSimpleName + id)
 
-    system.awaitTermination()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/master/AppManager.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/master/AppManager.scala b/daemon/src/main/scala/io/gearpump/cluster/master/AppManager.scala
index e6bd1db..058533e 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/master/AppManager.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/master/AppManager.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,8 +18,14 @@
 
 package io.gearpump.cluster.master
 
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import scala.util.{Failure, Success}
+
 import akka.actor._
 import akka.pattern.ask
+import org.slf4j.Logger
+
 import io.gearpump.cluster.AppMasterToMaster.{AppDataSaved, SaveAppDataFailed, _}
 import io.gearpump.cluster.AppMasterToWorker._
 import io.gearpump.cluster.ClientToMaster._
@@ -32,32 +38,29 @@ import io.gearpump.cluster.master.InMemoryKVService.{GetKVResult, PutKVResult, P
 import io.gearpump.cluster.master.Master._
 import io.gearpump.util.Constants._
 import io.gearpump.util.{ActorUtil, TimeOutScheduler, Util, _}
-import org.slf4j.Logger
-
-import scala.concurrent.Future
-import scala.concurrent.duration._
-import scala.util.{Failure, Success}
 
 /**
- * AppManager is dedicated part of Master to manager all applicaitons.
+ * AppManager is dedicated child of Master to manager all applications.
  */
-private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLauncherFactory) extends Actor with Stash with TimeOutScheduler{
+private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLauncherFactory)
+  extends Actor with Stash with TimeOutScheduler {
+
   private val LOG: Logger = LogUtil.getLogger(getClass)
 
-  private val executorId : Int = APPMASTER_DEFAULT_EXECUTOR_ID
+  private val executorId: Int = APPMASTER_DEFAULT_EXECUTOR_ID
   private val appMasterMaxRetries: Int = 5
-  private val appMasterRetryTimeRange: Duration = 20 seconds
+  private val appMasterRetryTimeRange: Duration = 20.seconds
 
   implicit val timeout = FUTURE_TIMEOUT
   implicit val executionContext = context.dispatcher
 
-  //next available appId
+  // Next available appId
   private var appId: Int = 1
 
-  //from appid to appMaster data
+  // From appid to appMaster data
   private var appMasterRegistry = Map.empty[Int, (ActorRef, AppMasterRuntimeInfo)]
 
-  // dead appmaster list
+  // Dead appmaster list
   private var deadAppMasters = Map.empty[Int, (ActorRef, AppMasterRuntimeInfo)]
 
   private var appMasterRestartPolicies = Map.empty[Int, RestartPolicy]
@@ -70,7 +73,7 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch
   def waitForMasterState: Receive = {
     case GetKVSuccess(_, result) =>
       val masterState = result.asInstanceOf[MasterState]
-      if(masterState != null) {
+      if (masterState != null) {
         this.appId = masterState.maxId + 1
         this.deadAppMasters = masterState.deadAppMasters
         this.appMasterRegistry = masterState.appMasterRegistry
@@ -85,11 +88,11 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch
       stash()
   }
 
-  def receiveHandler : Receive = {
+  def receiveHandler: Receive = {
     val msg = "Application Manager started. Ready for application submission..."
-    System.out.println(msg)
     LOG.info(msg)
-    clientMsgHandler orElse appMasterMessage orElse selfMsgHandler orElse workerMessage orElse appDataStoreService orElse terminationWatch
+    clientMsgHandler orElse appMasterMessage orElse selfMsgHandler orElse workerMessage orElse
+      appDataStoreService orElse terminationWatch
   }
 
   def clientMsgHandler: Receive = {
@@ -97,12 +100,15 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch
       LOG.info(s"Submit Application ${app.name}($appId) by $username...")
       val client = sender
       if (applicationNameExist(app.name)) {
-        client ! SubmitApplicationResult(Failure(new Exception(s"Application name ${app.name} already existed")))
+        client ! SubmitApplicationResult(Failure(
+          new Exception(s"Application name ${app.name} already existed")))
       } else {
-        context.actorOf(launcher.props(appId, executorId, app, jar, username, context.parent, Some(client)), s"launcher${appId}_${Util.randInt}")
+        context.actorOf(launcher.props(appId, executorId, app, jar, username, context.parent,
+          Some(client)), s"launcher${appId}_${Util.randInt()}")
 
         val appState = new ApplicationState(appId, app.name, 0, app, jar, username, null)
-        appMasterRestartPolicies += appId -> new RestartPolicy(appMasterMaxRetries, appMasterRetryTimeRange)
+        appMasterRestartPolicies += appId ->
+          new RestartPolicy(appMasterMaxRetries, appMasterRetryTimeRange)
         kvService ! PutKV(appId.toString, APP_STATE, appState)
         appId += 1
       }
@@ -123,7 +129,8 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch
           }
         case GetKVFailed(ex) =>
           client ! SubmitApplicationResult(Failure(
-            new Exception(s"Unable to obtain the Master State. Application $appId will not be restarted.")
+            new Exception(s"Unable to obtain the Master State. " +
+              s"Application $appId will not be restarted.")
           ))
       }
 
@@ -133,9 +140,11 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch
       Option(info) match {
         case Some(info) =>
           val worker = info.worker
-          LOG.info(s"Shutdown AppMaster at ${Option(worker).map(_.path).orNull}, appId: $appId, executorId: $executorId")
+          val workerPath = Option(worker).map(_.path).orNull
+          LOG.info(s"Shutdown AppMaster at ${workerPath}, appId: $appId, executorId: $executorId")
           cleanApplicationData(appId)
-          val shutdown = ShutdownExecutor(appId, executorId, s"AppMaster $appId shutdown requested by master...")
+          val shutdown = ShutdownExecutor(appId, executorId,
+            s"AppMaster $appId shutdown requested by master...")
           sendMsgWithTimeOutCallBack(worker, shutdown, 30000, shutDownExecutorTimeOut())
           sender ! ShutdownApplicationResult(Success(appId))
         case None =>
@@ -153,18 +162,20 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch
     case AppMastersDataRequest =>
       var appMastersData = collection.mutable.ListBuffer[AppMasterData]()
       appMasterRegistry.foreach(pair => {
-        val (id, (appMaster:ActorRef, info: AppMasterRuntimeInfo)) = pair
+        val (id, (appMaster: ActorRef, info: AppMasterRuntimeInfo)) = pair
         val appMasterPath = ActorUtil.getFullPath(context.system, appMaster.path)
-        val workerPath = Option(info.worker).map(worker => ActorUtil.getFullPath(context.system, worker.path))
+        val workerPath = Option(info.worker).map(worker =>
+          ActorUtil.getFullPath(context.system, worker.path))
         appMastersData += AppMasterData(
           AppMasterActive, id, info.appName, appMasterPath, workerPath.orNull,
           info.submissionTime, info.startTime, info.finishTime, info.user)
       })
 
       deadAppMasters.foreach(pair => {
-        val (id, (appMaster:ActorRef, info:AppMasterRuntimeInfo)) = pair
+        val (id, (appMaster: ActorRef, info: AppMasterRuntimeInfo)) = pair
         val appMasterPath = ActorUtil.getFullPath(context.system, appMaster.path)
-        val workerPath = Option(info.worker).map(worker => ActorUtil.getFullPath(context.system, worker.path))
+        val workerPath = Option(info.worker).map(worker =>
+          ActorUtil.getFullPath(context.system, worker.path))
 
         appMastersData += AppMasterData(
           AppMasterInActive, id, info.appName, appMasterPath, workerPath.orNull,
@@ -218,7 +229,7 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch
     case failed: ShutdownExecutorFailed =>
       LOG.error(failed.reason)
   }
-  
+
   private def shutDownExecutorTimeOut(): Unit = {
     LOG.error(s"Shut down executor time out")
   }
@@ -231,7 +242,8 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch
       LOG.info(s"Register AppMaster for app: ${register.appId} $register")
       context.watch(appMaster)
       appMasterRegistry += register.appId -> (appMaster, register)
-      kvService ! PutKV(MASTER_GROUP, MASTER_STATE, MasterState(appId, appMasterRegistry, deadAppMasters))
+      kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
+        MasterState(appId, appMasterRegistry, deadAppMasters))
       sender ! AppMasterRegistered(register.appId)
   }
 
@@ -257,13 +269,16 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch
   def terminationWatch: Receive = {
     case terminate: Terminated =>
       terminate.getAddressTerminated()
-      LOG.info(s"AppMaster(${terminate.actor.path}) is terminiated, network down: ${terminate.getAddressTerminated()}")
-      //Now we assume that the only normal way to stop the application is submitting a ShutdownApplication request
-      val application = appMasterRegistry.find{appInfo =>
+      LOG.info(s"AppMaster(${terminate.actor.path}) is terminiated, " +
+        s"network down: ${terminate.getAddressTerminated()}")
+
+      // Now we assume that the only normal way to stop the application is submitting a
+      // ShutdownApplication request
+      val application = appMasterRegistry.find { appInfo =>
         val (_, (actorRef, _)) = appInfo
         actorRef.compareTo(terminate.actor) == 0
       }
-      if(application.nonEmpty){
+      if (application.nonEmpty) {
         val appId = application.get._1
         (kvService ? GetKV(appId.toString, APP_STATE)).asInstanceOf[Future[GetKVResult]].map {
           case GetKVSuccess(_, result) =>
@@ -283,26 +298,29 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch
   def selfMsgHandler: Receive = {
     case RecoverApplication(state) =>
       val appId = state.appId
-      if(appMasterRestartPolicies.get(appId).get.allowRestart) {
+      if (appMasterRestartPolicies.get(appId).get.allowRestart) {
         LOG.info(s"AppManager Recovering Application $appId...")
-        context.actorOf(launcher.props(appId, executorId, state.app, state.jar, state.username, context.parent, None), s"launcher${appId}_${Util.randInt}")
+        context.actorOf(launcher.props(appId, executorId, state.app, state.jar, state.username,
+          context.parent, None), s"launcher${appId}_${Util.randInt()}")
       } else {
         LOG.error(s"Application $appId failed too many times")
       }
   }
 
-  case class RecoverApplication(applicationStatus : ApplicationState)
+  case class RecoverApplication(applicationStatus: ApplicationState)
 
-  private def cleanApplicationData(appId : Int) : Unit = {
-    //add the dead app to dead appMaster
+  private def cleanApplicationData(appId: Int): Unit = {
+    // Add the dead app to dead appMaster
     appMasterRegistry.get(appId).foreach { pair =>
       val (appMasterActor, info) = pair
-      deadAppMasters += appId -> (appMasterActor, info.copy(finishTime = System.currentTimeMillis()))
+      deadAppMasters += appId -> (appMasterActor, info.copy(
+        finishTime = System.currentTimeMillis()))
     }
 
     appMasterRegistry -= appId
 
-    kvService ! PutKV(MASTER_GROUP, MASTER_STATE, MasterState(this.appId, appMasterRegistry, deadAppMasters))
+    kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
+      MasterState(this.appId, appMasterRegistry, deadAppMasters))
     kvService ! DeleteKVGroup(appId.toString)
   }
 
@@ -313,7 +331,7 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch
 
 object AppManager {
   final val APP_STATE = "app_state"
-  //The id is used in KVStore
+  // The id is used in KVStore
   final val MASTER_STATE = "master_state"
 
   case class MasterState(

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/master/InMemoryKVService.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/master/InMemoryKVService.scala b/daemon/src/main/scala/io/gearpump/cluster/master/InMemoryKVService.scala
index fb66a0c..616d3ee 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/master/InMemoryKVService.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/master/InMemoryKVService.scala
@@ -19,21 +19,22 @@
 package io.gearpump.cluster.master
 
 import java.util.concurrent.TimeUnit
+import scala.concurrent.TimeoutException
+import scala.concurrent.duration.Duration
 
 import akka.actor._
 import akka.cluster.Cluster
-import akka.cluster.ddata.{DistributedData, LWWMap, Key, LWWMapKey}
 import akka.cluster.ddata.Replicator._
-import io.gearpump.util.{LogUtil}
+import akka.cluster.ddata.{DistributedData, LWWMap, LWWMapKey}
 import org.slf4j.Logger
-import scala.concurrent.TimeoutException
-import scala.concurrent.duration.Duration
+
+import io.gearpump.util.LogUtil
 
 /**
- * A replicated simple in-memory KV service.
+ * A replicated simple in-memory KV service. The replications are stored on all masters.
  */
 class InMemoryKVService extends Actor with Stash {
-  import InMemoryKVService._
+  import io.gearpump.cluster.master.InMemoryKVService._
 
   private val KV_SERVICE = "gearpump_kvservice"
 
@@ -41,7 +42,7 @@ class InMemoryKVService extends Actor with Stash {
   private val replicator = DistributedData(context.system).replicator
   private implicit val cluster = Cluster(context.system)
 
-  //optimize write path, we can tolerate one master down for recovery.
+  // Optimize write path, we can tolerate one master down for recovery.
   private val timeout = Duration(15, TimeUnit.SECONDS)
   private val readMajority = ReadMajority(timeout)
   private val writeMajority = WriteMajority(timeout)
@@ -50,39 +51,39 @@ class InMemoryKVService extends Actor with Stash {
     LWWMapKey[Any](KV_SERVICE + "_" + group)
   }
 
-  def receive : Receive = kvService
+  def receive: Receive = kvService
 
-  def kvService : Receive = {
+  def kvService: Receive = {
 
-    case GetKV(group: String, key : String) =>
+    case GetKV(group: String, key: String) =>
       val request = Request(sender(), key)
       replicator ! Get(groupKey(group), readMajority, Some(request))
-    case success@ GetSuccess(group: LWWMapKey[Any], Some(request: Request)) =>
+    case success@GetSuccess(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
       val appData = success.get(group)
       LOG.info(s"Successfully retrived group: ${group.id}")
       request.client ! GetKVSuccess(request.key, appData.get(request.key).orNull)
-    case NotFound(group: LWWMapKey[Any], Some(request: Request)) =>
+    case NotFound(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
       LOG.info(s"We cannot find group $group")
       request.client ! GetKVSuccess(request.key, null)
-    case GetFailure(group: LWWMapKey[Any], Some(request: Request)) =>
+    case GetFailure(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
       val error = s"Failed to get application data, the request key is ${request.key}"
       LOG.error(error)
       request.client ! GetKVFailed(new Exception(error))
 
     case PutKV(group: String, key: String, value: Any) =>
       val request = Request(sender(), key)
-      val update = Update(groupKey(group), LWWMap(), writeMajority, Some(request)) {map =>
+      val update = Update(groupKey(group), LWWMap(), writeMajority, Some(request)) { map =>
         map + (key -> value)
       }
       replicator ! update
-    case UpdateSuccess(group: LWWMapKey[Any], Some(request: Request)) =>
-        request.client ! PutKVSuccess
-    case ModifyFailure(group: LWWMapKey[Any], error, cause, Some(request: Request)) =>
+    case UpdateSuccess(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
+      request.client ! PutKVSuccess
+    case ModifyFailure(group: LWWMapKey[Any @unchecked], error, cause, Some(request: Request)) =>
       request.client ! PutKVFailed(request.key, new Exception(error, cause))
-    case UpdateTimeout(group: LWWMapKey[Any], Some(request: Request)) =>
+    case UpdateTimeout(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
       request.client ! PutKVFailed(request.key, new TimeoutException())
 
-    case delete@ DeleteKVGroup(group: String) =>
+    case delete@DeleteKVGroup(group: String) =>
       replicator ! Delete(groupKey(group), writeMajority)
     case DeleteSuccess(group) =>
       LOG.info(s"KV Group ${group.id} is deleted")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala b/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala
index fdef42e..0203237 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,20 +19,26 @@
 package io.gearpump.cluster.master
 
 import java.lang.management.ManagementFactory
+import scala.collection.JavaConverters._
+import scala.collection.immutable
 
 import akka.actor._
 import akka.remote.DisassociatedEvent
 import com.typesafe.config.Config
+import org.apache.commons.lang.exception.ExceptionUtils
+import org.slf4j.Logger
+
 import io.gearpump.cluster.AppMasterToMaster._
 import io.gearpump.cluster.ClientToMaster._
 import io.gearpump.cluster.ClusterConfig
 import io.gearpump.cluster.MasterToAppMaster._
-import io.gearpump.cluster.MasterToClient.{HistoryMetricsItem, HistoryMetrics, MasterConfig, ResolveWorkerIdResult}
+import io.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, MasterConfig, ResolveWorkerIdResult}
 import io.gearpump.cluster.MasterToWorker._
 import io.gearpump.cluster.WorkerToMaster._
 import io.gearpump.cluster.master.InMemoryKVService._
 import io.gearpump.cluster.master.Master.{MasterInfo, WorkerTerminated, _}
 import io.gearpump.cluster.scheduler.Scheduler.ApplicationFinished
+import io.gearpump.cluster.worker.WorkerId
 import io.gearpump.jarstore.local.LocalJarStore
 import io.gearpump.metrics.Metrics.ReportMetrics
 import io.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService}
@@ -40,31 +46,22 @@ import io.gearpump.transport.HostPort
 import io.gearpump.util.Constants._
 import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
 import io.gearpump.util._
-import io.gearpump.WorkerId
-
-import org.apache.commons.lang.exception.ExceptionUtils
-import org.slf4j.Logger
-
-import scala.collection.JavaConverters._
-import scala.collection.immutable
 
 /**
- * Master manage resources of the whole cluster.
+ * Master Actor who manages resources of the whole cluster.
  * It is like the resource manager of YARN.
- *
  */
 private[cluster] class Master extends Actor with Stash {
   private val LOG: Logger = LogUtil.getLogger(getClass)
-  private val systemConfig : Config = context.system.settings.config
+  private val systemConfig: Config = context.system.settings.config
   private implicit val timeout = Constants.FUTURE_TIMEOUT
   private val kvService = context.actorOf(Props(new InMemoryKVService()), "kvService")
-  // resources and resourceRequests can be dynamically constructed by
+  // Resources and resourceRequests can be dynamically constructed by
   // heartbeat of worker and appmaster when master singleton is migrated.
-  // we don't need to persist them in cluster
+  // We don't need to persist them in cluster
+  private var appManager: ActorRef = null
 
-  private var appManager : ActorRef = null
-
-  private var scheduler : ActorRef = null
+  private var scheduler: ActorRef = null
 
   private var workers = new immutable.HashMap[ActorRef, WorkerId]
 
@@ -74,16 +71,16 @@ private[cluster] class Master extends Actor with Stash {
 
   def receive: Receive = null
 
-  // register jvm metrics
+  // Register jvm metrics
   Metrics(context.system).register(new JvmMetricsSet(s"master"))
 
   LOG.info("master is started at " + ActorUtil.getFullPath(context.system, self.path) + "...")
 
   val jarStoreRootPath = systemConfig.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH)
 
-  val jarStore = if(Util.isLocalPath(jarStoreRootPath)) {
+  private val jarStore = if (Util.isLocalPath(jarStoreRootPath)) {
     Some(context.actorOf(Props(classOf[LocalJarStore], jarStoreRootPath)))
-  } else{
+  } else {
     None
   }
 
@@ -97,7 +94,8 @@ private[cluster] class Master extends Actor with Stash {
       context.actorOf(Props(new HistoryMetricsService("master", getHistoryMetricsConfig)))
     }
 
-    val metricsReportService = context.actorOf(Props(new MetricsReporterService(Metrics(context.system))))
+    val metricsReportService = context.actorOf(
+      Props(new MetricsReporterService(Metrics(context.system))))
     historyMetricsService.tell(ReportMetrics, metricsReportService)
     Some(historyMetricsService)
   } else {
@@ -109,7 +107,7 @@ private[cluster] class Master extends Actor with Stash {
 
   def waitForNextWorkerId: Receive = {
     case GetKVSuccess(_, result) =>
-      if(result != null) {
+      if (result != null) {
         this.nextWorkerId = result.asInstanceOf[Int]
       } else {
         LOG.warn("Cannot find existing state in the distributed cluster...")
@@ -124,7 +122,7 @@ private[cluster] class Master extends Actor with Stash {
       stash()
   }
 
-  def receiveHandler : Receive = workerMsgHandler orElse
+  def receiveHandler: Receive = workerMsgHandler orElse
     appMasterMsgHandler orElse
     clientMsgHandler orElse
     metricsService orElse
@@ -134,7 +132,7 @@ private[cluster] class Master extends Actor with Stash {
     kvServiceMsgHandler orElse
     ActorUtil.defaultMsgHandler(self)
 
-  def workerMsgHandler : Receive = {
+  def workerMsgHandler: Receive = {
     case RegisterNewWorker =>
       val workerId = WorkerId(nextWorkerId, System.currentTimeMillis())
       nextWorkerId += 1
@@ -150,41 +148,42 @@ private[cluster] class Master extends Actor with Stash {
       workers += (sender() -> id)
       val workerHostname = ActorUtil.getHostname(sender())
       LOG.info(s"Register Worker with id $id from $workerHostname ....")
-    case resourceUpdate : ResourceUpdate =>
+    case resourceUpdate: ResourceUpdate =>
       scheduler forward resourceUpdate
   }
 
-  def jarStoreService : Receive = {
+  def jarStoreService: Receive = {
     case GetJarStoreServer =>
       jarStore.foreach(_ forward GetJarStoreServer)
   }
 
   def kvServiceMsgHandler: Receive = {
     case PutKVSuccess =>
-      //Skip
+    // Skip
     case PutKVFailed(key, exception) =>
-      LOG.error(s"Put KV of key $key to InMemoryKVService failed.\n" + ExceptionUtils.getStackTrace(exception))
+      LOG.error(s"Put KV of key $key to InMemoryKVService failed.\n" +
+        ExceptionUtils.getStackTrace(exception))
   }
 
-  def metricsService : Receive = {
+  def metricsService: Receive = {
     case query: QueryHistoryMetrics =>
       if (historyMetricsService.isEmpty) {
-        // return empty metrics so that we don't hang the UI
+        // Returns empty metrics so that we don't hang the UI
         sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem])
       } else {
         historyMetricsService.get forward query
       }
   }
 
-  def appMasterMsgHandler : Receive = {
-    case  request : RequestResource =>
+  def appMasterMsgHandler: Receive = {
+    case request: RequestResource =>
       scheduler forward request
-    case registerAppMaster : RegisterAppMaster =>
-      //forward to appManager
+    case registerAppMaster: RegisterAppMaster =>
+      // Forward to appManager
       appManager forward registerAppMaster
-    case save : SaveAppData =>
+    case save: SaveAppData =>
       appManager forward save
-    case get : GetAppData =>
+    case get: GetAppData =>
       appManager forward get
     case GetAllWorkers =>
       sender ! WorkerList(workers.values.toList)
@@ -219,7 +218,7 @@ private[cluster] class Master extends Actor with Stash {
 
     if (cluster.isEmpty) {
 
-      //add myself into the list if it is a single node cluster
+      // Add myself into the list if it is a single node cluster
       List(hostPort)
     } else {
       cluster
@@ -228,18 +227,18 @@ private[cluster] class Master extends Actor with Stash {
 
   import scala.util.{Failure, Success}
 
-  def clientMsgHandler : Receive = {
-    case app : SubmitApplication =>
+  def clientMsgHandler: Receive = {
+    case app: SubmitApplication =>
       LOG.debug(s"Receive from client, SubmitApplication $app")
       appManager.forward(app)
-    case app : RestartApplication =>
+    case app: RestartApplication =>
       LOG.debug(s"Receive from client, RestartApplication $app")
       appManager.forward(app)
-    case app : ShutdownApplication =>
+    case app: ShutdownApplication =>
       LOG.debug(s"Receive from client, Shutting down Application ${app.appId}")
       scheduler ! ApplicationFinished(app.appId)
       appManager.forward(app)
-    case app : ResolveAppId =>
+    case app: ResolveAppId =>
       LOG.debug(s"Receive from client, resolving appId ${app.appId} to ActorRef")
       appManager.forward(app)
     case resolve: ResolveWorkerId =>
@@ -247,7 +246,8 @@ private[cluster] class Master extends Actor with Stash {
       val worker = workers.find(_._2 == resolve.workerId)
       worker match {
         case Some(worker) => sender ! ResolveWorkerIdResult(Success(worker._1))
-        case None => sender ! ResolveWorkerIdResult(Failure(new Exception(s"cannot find worker ${resolve.workerId}")))
+        case None => sender ! ResolveWorkerIdResult(Failure(
+          new Exception(s"cannot find worker ${resolve.workerId}")))
       }
     case AppMastersDataRequest =>
       LOG.debug("Master received AppMastersDataRequest")
@@ -262,19 +262,20 @@ private[cluster] class Master extends Actor with Stash {
       sender ! MasterConfig(ClusterConfig.filterOutDefaultConfig(systemConfig))
   }
 
-  def disassociated : Receive = {
-    case disassociated : DisassociatedEvent =>
+  def disassociated: Receive = {
+    case disassociated: DisassociatedEvent =>
       LOG.info(s" disassociated ${disassociated.remoteAddress}")
-      //LOG.info(s"remote lifecycle events are "+systemConfig.getString("akka.remote.log-remote-lifecycle-events"))
   }
 
-  def terminationWatch : Receive = {
-    case t : Terminated =>
+  def terminationWatch: Receive = {
+    case t: Terminated =>
       val actor = t.actor
-      LOG.info(s"worker ${actor.path} get terminated, is it due to network reason? ${t.getAddressTerminated()}")
+      LOG.info(s"worker ${actor.path} get terminated, is it due to network reason?" +
+        t.getAddressTerminated())
+
       LOG.info("Let's filter out dead resources...")
-      // filter out dead worker resource
-      if(workers.keySet.contains(actor)){
+      // Filters out dead worker resource
+      if (workers.keySet.contains(actor)) {
         scheduler ! WorkerTerminated(workers.get(actor).get)
         workers -= actor
       }
@@ -283,9 +284,11 @@ private[cluster] class Master extends Actor with Stash {
   override def preStart(): Unit = {
     val path = ActorUtil.getFullPath(context.system, self.path)
     LOG.info(s"master path is $path")
-    val schedulerClass = Class.forName(systemConfig.getString(Constants.GEARPUMP_SCHEDULING_SCHEDULER))
+    val schedulerClass = Class.forName(
+      systemConfig.getString(Constants.GEARPUMP_SCHEDULING_SCHEDULER))
 
-    appManager = context.actorOf(Props(new AppManager(kvService, AppMasterLauncher)), classOf[AppManager].getSimpleName)
+    appManager = context.actorOf(Props(new AppManager(kvService, AppMasterLauncher)),
+      classOf[AppManager].getSimpleName)
     scheduler = context.actorOf(Props(schedulerClass))
     context.system.eventStream.subscribe(self, classOf[DisassociatedEvent])
   }
@@ -298,10 +301,10 @@ object Master {
 
   case class WorkerTerminated(workerId: WorkerId)
 
-  case class MasterInfo(master: ActorRef, startTime : Long = 0L)
+  case class MasterInfo(master: ActorRef, startTime: Long = 0L)
 
   object MasterInfo {
-    def empty = MasterInfo(null)
+    def empty: MasterInfo = MasterInfo(null)
   }
 
   case class SlotStatus(totalSlots: Int, availableSlots: Int)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala b/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala
index 3b1bd9f..5df008e 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,23 +18,26 @@
 
 package io.gearpump.cluster.scheduler
 
+import scala.collection.mutable
+
 import akka.actor.ActorRef
-import io.gearpump.WorkerId
+
 import io.gearpump.cluster.AppMasterToMaster.RequestResource
 import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated
 import io.gearpump.cluster.scheduler.Relaxation._
 import io.gearpump.cluster.scheduler.Scheduler.PendingRequest
+import io.gearpump.cluster.worker.WorkerId
 
-import scala.collection.mutable
-
+/** Assign resource to application based on the priority of the application */
 class PriorityScheduler extends Scheduler {
   private var resourceRequests = new mutable.PriorityQueue[PendingRequest]()(requestOrdering)
 
-  def requestOrdering = new Ordering[PendingRequest] {
-    override def compare(x: PendingRequest, y: PendingRequest) = {
+  def requestOrdering: Ordering[PendingRequest] = new Ordering[PendingRequest] {
+    override def compare(x: PendingRequest, y: PendingRequest): Int = {
       var res = x.request.priority.id - y.request.priority.id
-      if (res == 0)
+      if (res == 0) {
         res = y.timeStamp.compareTo(x.timeStamp)
+      }
       res
     }
   }
@@ -59,8 +62,10 @@ class PriorityScheduler extends Scheduler {
           if (newAllocated < request.resource) {
             val remainingRequest = request.resource - newAllocated
             val remainingExecutors = request.executorNum - allocations.length
-            val newResourceRequest = request.copy(resource = remainingRequest, executorNum = remainingExecutors)
-            scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, newResourceRequest, timeStamp)
+            val newResourceRequest = request.copy(resource = remainingRequest,
+              executorNum = remainingExecutors)
+            scheduleLater = scheduleLater :+
+              PendingRequest(appId, appMaster, newResourceRequest, timeStamp)
           }
           allocated = allocated + newAllocated
         case ONEWORKER =>
@@ -71,7 +76,8 @@ class PriorityScheduler extends Scheduler {
           if (availableResource.nonEmpty) {
             val (workerId, (worker, resource)) = availableResource.get
             allocated = allocated + request.resource
-            appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker, workerId)))
+            appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker,
+              workerId)))
             resourcesSnapShot.update(workerId, (worker, resource - request.resource))
           } else {
             scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, request, timeStamp)
@@ -80,24 +86,27 @@ class PriorityScheduler extends Scheduler {
           val workerAndResource = resourcesSnapShot.get(request.workerId)
           if (workerAndResource.nonEmpty && workerAndResource.get._2 > request.resource) {
             val (worker, availableResource) = workerAndResource.get
-            appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker, request.workerId)))
+            appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker,
+              request.workerId)))
             allocated = allocated + request.resource
-            resourcesSnapShot.update(request.workerId, (worker, availableResource - request.resource))
+            resourcesSnapShot.update(request.workerId, (worker,
+              availableResource - request.resource))
           } else {
             scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, request, timeStamp)
           }
       }
     }
-    for(request <- scheduleLater)
+    for (request <- scheduleLater)
       resourceRequests.enqueue(request)
   }
 
   def resourceRequestHandler: Receive = {
     case RequestResource(appId, request) =>
-      LOG.info(s"Request resource: appId: $appId, slots: ${request.resource.slots}, relaxation: ${request.relaxation}," +
-        s" executor number: ${request.executorNum}")
+      LOG.info(s"Request resource: appId: $appId, slots: ${request.resource.slots}, " +
+        s"relaxation: ${request.relaxation}, executor number: ${request.executorNum}")
       val appMaster = sender()
-      resourceRequests.enqueue(new PendingRequest(appId, appMaster, request, System.currentTimeMillis()))
+      resourceRequests.enqueue(new PendingRequest(appId, appMaster, request,
+        System.currentTimeMillis()))
       allocateResource()
   }
 
@@ -105,7 +114,9 @@ class PriorityScheduler extends Scheduler {
     resourceRequests = resourceRequests.filter(_.appId != appId)
   }
 
-  private def allocateFairly(resources: mutable.HashMap[WorkerId, (ActorRef, Resource)], request: ResourceRequest): List[ResourceAllocation] = {
+  private def allocateFairly(
+      resources: mutable.HashMap[WorkerId, (ActorRef, Resource)], request: ResourceRequest)
+    : List[ResourceAllocation] = {
     val workerNum = resources.size
     var allocations = List.empty[ResourceAllocation]
     var totalAvailable = Resource(resources.values.map(_._2.slots).sum)
@@ -116,13 +127,16 @@ class PriorityScheduler extends Scheduler {
       val exeutorNum = Math.min(workerNum, remainingExecutors)
       val toRequest = Resource(remainingRequest.slots * exeutorNum / remainingExecutors)
 
-      val flattenResource = resources.toArray.sortBy(_._2._2.slots)(Ordering[Int].reverse).take(exeutorNum).zipWithIndex.flatMap { workerWithIndex =>
+      val sortedResources = resources.toArray.sortBy(_._2._2.slots)(Ordering[Int].reverse)
+      val pickedResources = sortedResources.take(exeutorNum)
+
+      val flattenResource = pickedResources.zipWithIndex.flatMap { workerWithIndex =>
         val ((workerId, (worker, resource)), index) = workerWithIndex
         0.until(resource.slots).map(seq => ((workerId, worker), seq * workerNum + index))
       }.sortBy(_._2).map(_._1)
 
       if (flattenResource.length < toRequest.slots) {
-        //Can not safisfy the user's requirements
+        // Can not safisfy the user's requirements
         totalAvailable = Resource.empty
       } else {
         flattenResource.take(toRequest.slots).groupBy(actor => actor).mapValues(_.length).

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/scheduler/Scheduler.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/scheduler/Scheduler.scala b/daemon/src/main/scala/io/gearpump/cluster/scheduler/Scheduler.scala
index 8ccf1fb..ccd105f 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/scheduler/Scheduler.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/scheduler/Scheduler.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,47 +17,48 @@
  */
 package io.gearpump.cluster.scheduler
 
+import scala.collection.mutable
+
 import akka.actor.{Actor, ActorRef}
-import io.gearpump.cluster.MasterToWorker.UpdateResourceSucceed
-import io.gearpump.util.LogUtil
-import io.gearpump.{WorkerId, TimeStamp}
+import org.slf4j.Logger
+
+import io.gearpump.TimeStamp
 import io.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, UpdateResourceSucceed, WorkerRegistered}
 import io.gearpump.cluster.WorkerToMaster.ResourceUpdate
 import io.gearpump.cluster.master.Master.WorkerTerminated
 import io.gearpump.cluster.scheduler.Scheduler.ApplicationFinished
+import io.gearpump.cluster.worker.WorkerId
 import io.gearpump.util.LogUtil
-import org.slf4j.Logger
-
-import scala.collection.mutable
 
 /**
  * Scheduler schedule resource for different applications.
  */
-abstract class Scheduler extends Actor{
+abstract class Scheduler extends Actor {
   val LOG: Logger = LogUtil.getLogger(getClass)
   protected var resources = new mutable.HashMap[WorkerId, (ActorRef, Resource)]
 
-  def handleScheduleMessage : Receive = {
+  def handleScheduleMessage: Receive = {
     case WorkerRegistered(id, _) =>
-      if(!resources.contains(id)) {
+      if (!resources.contains(id)) {
         LOG.info(s"Worker $id added to the scheduler")
         resources.put(id, (sender, Resource.empty))
       }
     case update@ResourceUpdate(worker, workerId, resource) =>
       LOG.info(s"$update...")
-      if(resources.contains(workerId)) {
+      if (resources.contains(workerId)) {
         val resourceReturned = resource > resources.get(workerId).get._2
         resources.update(workerId, (worker, resource))
-        if(resourceReturned){
+        if (resourceReturned) {
           allocateResource()
         }
         sender ! UpdateResourceSucceed
       }
       else {
-        sender ! UpdateResourceFailed(s"ResourceUpdate failed! The worker $workerId has not been registered into master")
+        sender ! UpdateResourceFailed(
+          s"ResourceUpdate failed! The worker $workerId has not been registered into master")
       }
     case WorkerTerminated(workerId) =>
-      if(resources.contains(workerId)){
+      if (resources.contains(workerId)) {
         resources -= workerId
       }
     case ApplicationFinished(appId) =>
@@ -69,7 +70,9 @@ abstract class Scheduler extends Actor{
   def doneApplication(appId: Int): Unit
 }
 
-object Scheduler{
-  case class PendingRequest(appId: Int, appMaster: ActorRef, request: ResourceRequest, timeStamp: TimeStamp)
+object Scheduler {
+  case class PendingRequest(
+      appId: Int, appMaster: ActorRef, request: ResourceRequest, timeStamp: TimeStamp)
+
   case class ApplicationFinished(appId: Int)
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala b/daemon/src/main/scala/io/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala
index 08a342e..f97a209 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,15 +20,18 @@ package io.gearpump.cluster.worker
 import java.io.File
 
 import com.typesafe.config.Config
+import org.slf4j.Logger
+
 import io.gearpump.cluster.scheduler.Resource
 import io.gearpump.util.{LogUtil, RichProcess, Util}
-import org.slf4j.Logger
 
+/** Launcher to start an executor process */
 class DefaultExecutorProcessLauncher(val config: Config) extends ExecutorProcessLauncher {
   private val LOG: Logger = LogUtil.getLogger(getClass)
 
-  override def createProcess(appId: Int, executorId: Int, resource: Resource, config: Config, options: Array[String],
-    classPath: Array[String], mainClass: String, arguments: Array[String]): RichProcess = {
+  override def createProcess(
+      appId: Int, executorId: Int, resource: Resource, config: Config, options: Array[String],
+      classPath: Array[String], mainClass: String, arguments: Array[String]): RichProcess = {
 
     LOG.info(s"Launch executor, classpath: ${classPath.mkString(File.pathSeparator)}")
     Util.startProcess(options, classPath, mainClass, arguments)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala b/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala
index a746b39..1c22b05 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -22,34 +22,33 @@ import java.io.File
 import java.lang.management.ManagementFactory
 import java.net.URL
 import java.util.concurrent.{Executors, TimeUnit}
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.{Failure, Success, Try}
 
 import akka.actor.SupervisorStrategy.Stop
 import akka.actor._
 import com.typesafe.config.{Config, ConfigFactory}
-import io.gearpump.WorkerId
+import org.slf4j.Logger
+
 import io.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData}
 import io.gearpump.cluster.AppMasterToWorker._
 import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig}
-import io.gearpump.cluster.worker.Worker.ExecutorWatcher
-import io.gearpump.cluster.{ExecutorJVMConfig, ClusterConfig}
 import io.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, WorkerConfig}
 import io.gearpump.cluster.MasterToWorker._
 import io.gearpump.cluster.WorkerToAppMaster._
 import io.gearpump.cluster.WorkerToMaster._
 import io.gearpump.cluster.master.Master.MasterInfo
 import io.gearpump.cluster.scheduler.Resource
+import io.gearpump.cluster.worker.Worker.ExecutorWatcher
+import io.gearpump.cluster.{ClusterConfig, ExecutorJVMConfig}
 import io.gearpump.jarstore.JarStoreService
 import io.gearpump.metrics.Metrics.ReportMetrics
 import io.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService}
 import io.gearpump.util.ActorSystemBooter.Daemon
 import io.gearpump.util.Constants._
 import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
-import io.gearpump.util.{Constants, TimeOutScheduler, _}
-import org.slf4j.Logger
-
-import scala.concurrent.{Future, Promise, ExecutionContext}
-import scala.concurrent.duration._
-import scala.util.{Try, Success, Failure}
+import io.gearpump.util.{TimeOutScheduler, _}
 
 /**
  * Worker is used to track the resource on single machine, it is like
@@ -57,8 +56,8 @@ import scala.util.{Try, Success, Failure}
  *
  * @param masterProxy masterProxy is used to resolve the master
  */
-private[cluster] class Worker(masterProxy : ActorRef) extends Actor with TimeOutScheduler{
-  private val systemConfig : Config = context.system.settings.config
+private[cluster] class Worker(masterProxy: ActorRef) extends Actor with TimeOutScheduler {
+  private val systemConfig: Config = context.system.settings.config
 
   private val address = ActorUtil.getFullPath(context.system, self.path)
   private var resource = Resource.empty
@@ -73,14 +72,14 @@ private[cluster] class Worker(masterProxy : ActorRef) extends Actor with TimeOut
   jarStoreService.init(systemConfig, context.system)
 
   private val ioPool = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())
-  private val resourceUpdateTimeoutMs = 30000 //milliseconds
+  private val resourceUpdateTimeoutMs = 30000 // Milliseconds
 
   private var totalSlots: Int = 0
 
   val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED)
   var historyMetricsService: Option[ActorRef] = None
 
-  override def receive : Receive = null
+  override def receive: Receive = null
   var LOG: Logger = LogUtil.getLogger(getClass)
 
   def service: Receive =
@@ -90,10 +89,10 @@ private[cluster] class Worker(masterProxy : ActorRef) extends Actor with TimeOut
       terminationWatch(masterInfo.master) orElse
       ActorUtil.defaultMsgHandler(self)
 
-  def metricsService : Receive = {
+  def metricsService: Receive = {
     case query: QueryHistoryMetrics =>
       if (historyMetricsService.isEmpty) {
-        // return empty metrics so that we don't hang the UI
+        // Returns empty metrics so that we don't hang the UI
         sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem])
       } else {
         historyMetricsService.get forward query
@@ -104,8 +103,8 @@ private[cluster] class Worker(masterProxy : ActorRef) extends Actor with TimeOut
 
   val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig)
 
-  private def initializeMetrics: Unit = {
-    // register jvm metrics
+  private def initializeMetrics(): Unit = {
+    // Registers jvm metrics
     Metrics(context.system).register(new JvmMetricsSet(s"worker${id}"))
 
     historyMetricsService = if (metricsEnabled) {
@@ -113,7 +112,8 @@ private[cluster] class Worker(masterProxy : ActorRef) extends Actor with TimeOut
         context.actorOf(Props(new HistoryMetricsService("worker" + id, getHistoryMetricsConfig)))
       }
 
-      val metricsReportService = context.actorOf(Props(new MetricsReporterService(Metrics(context.system))))
+      val metricsReportService = context.actorOf(Props(
+        new MetricsReporterService(Metrics(context.system))))
       historyMetricsService.tell(ReportMetrics, metricsReportService)
       Some(historyMetricsService)
     } else {
@@ -121,25 +121,27 @@ private[cluster] class Worker(masterProxy : ActorRef) extends Actor with TimeOut
     }
   }
 
-  def waitForMasterConfirm(killSelf : Cancellable) : Receive = {
+  def waitForMasterConfirm(timeoutTicker: Cancellable): Receive = {
 
     // If master get disconnected, the WorkerRegistered may be triggered multiple times.
     case WorkerRegistered(id, masterInfo) =>
       this.id = id
 
-      // Add the flag check, so that we don't re-initialize when WorkerRegistered
-      // is triggered multiple times.
+      // Adds the flag check, so that we don't re-initialize the metrics when worker re-register
+      // itself.
       if (!metricsInitialized) {
-        initializeMetrics
+        initializeMetrics()
         metricsInitialized = true
       }
 
       this.masterInfo = masterInfo
-      killSelf.cancel()
+      timeoutTicker.cancel()
       context.watch(masterInfo.master)
       this.LOG = LogUtil.getLogger(getClass, worker = id)
-      LOG.info(s"Worker is registered. actor path: ${ActorUtil.getFullPath(context.system, self.path)} ....")
-      sendMsgWithTimeOutCallBack(masterInfo.master, ResourceUpdate(self, id, resource), resourceUpdateTimeoutMs, updateResourceTimeOut())
+      LOG.info(s"Worker is registered. " +
+        s"actor path: ${ActorUtil.getFullPath(context.system, self.path)} ....")
+      sendMsgWithTimeOutCallBack(masterInfo.master, ResourceUpdate(self, id, resource),
+        resourceUpdateTimeoutMs, updateResourceTimeOut())
       context.become(service)
   }
 
@@ -147,31 +149,33 @@ private[cluster] class Worker(masterProxy : ActorRef) extends Actor with TimeOut
     LOG.error(s"Update worker resource time out")
   }
 
-  def appMasterMsgHandler : Receive = {
-    case shutdown @ ShutdownExecutor(appId, executorId, reason : String) =>
+  def appMasterMsgHandler: Receive = {
+    case shutdown@ShutdownExecutor(appId, executorId, reason: String) =>
       val actorName = ActorUtil.actorNameForExecutor(appId, executorId)
       val executorToStop = executorNameToActor.get(actorName)
       if (executorToStop.isDefined) {
-        LOG.info(s"Shutdown executor ${actorName}(${executorToStop.get.path.toString}) due to: $reason")
+        LOG.info(s"Shutdown executor ${actorName}(${executorToStop.get.path.toString}) " +
+          s"due to: $reason")
         executorToStop.get.forward(shutdown)
       } else {
         LOG.error(s"Cannot find executor $actorName, ignore this message")
         sender ! ShutdownExecutorFailed(s"Can not find executor $executorId for app $appId")
       }
-    case launch : LaunchExecutor =>
+    case launch: LaunchExecutor =>
       LOG.info(s"$launch")
       if (resource < launch.resource) {
         sender ! ExecutorLaunchRejected("There is no free resource on this machine")
       } else {
         val actorName = ActorUtil.actorNameForExecutor(launch.appId, launch.executorId)
 
-        val executor = context.actorOf(Props(classOf[ExecutorWatcher], launch, masterInfo, ioPool, jarStoreService, executorProcLauncher))
-        executorNameToActor += actorName ->executor
+        val executor = context.actorOf(Props(classOf[ExecutorWatcher], launch, masterInfo, ioPool,
+          jarStoreService, executorProcLauncher))
+        executorNameToActor += actorName -> executor
 
         resource = resource - launch.resource
         allocatedResources = allocatedResources + (executor -> launch.resource)
 
-        reportResourceToMaster
+        reportResourceToMaster()
         executorsInfo += executor ->
           ExecutorSlots(launch.appId, launch.executorId, launch.resource.slots)
         context.watch(executor)
@@ -195,26 +199,29 @@ private[cluster] class Worker(masterProxy : ActorRef) extends Actor with TimeOut
         resource.slots,
         userDir,
         jvmName = ManagementFactory.getRuntimeMXBean().getName(),
-        resourceManagerContainerId = systemConfig.getString(Constants.GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID),
+        resourceManagerContainerId = systemConfig.getString(
+          GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID),
         historyMetricsConfig = getHistoryMetricsConfig)
       )
     case ChangeExecutorResource(appId, executorId, usedResource) =>
       for (executor <- executorActorRef(appId, executorId);
-           allocatedResource <- allocatedResources.get(executor)) {
+        allocatedResource <- allocatedResources.get(executor)) {
+
         allocatedResources += executor -> usedResource
         resource = resource + allocatedResource - usedResource
-        reportResourceToMaster
+        reportResourceToMaster()
 
         if (usedResource == Resource(0)) {
           allocatedResources -= executor
           // stop executor if there is no resource binded to it.
           LOG.info(s"Shutdown executor $executorId because the resource used is zero")
-          executor ! ShutdownExecutor(appId, executorId, "Shutdown executor because the resource used is zero")
+          executor ! ShutdownExecutor(appId, executorId,
+            "Shutdown executor because the resource used is zero")
         }
       }
   }
 
-  private def reportResourceToMaster: Unit = {
+  private def reportResourceToMaster(): Unit = {
     sendMsgWithTimeOutCallBack(masterInfo.master,
       ResourceUpdate(self, id, resource), resourceUpdateTimeoutMs, updateResourceTimeOut())
   }
@@ -233,14 +240,28 @@ private[cluster] class Worker(masterProxy : ActorRef) extends Actor with TimeOut
       }
   }
 
-  def terminationWatch(master : ActorRef) : Receive = {
+  private def retryRegisterWorker(workerId: WorkerId, timeOutSeconds: Int): Cancellable = {
+    repeatActionUtil(
+      seconds = timeOutSeconds,
+      action = () => {
+        masterProxy ! RegisterWorker(workerId)
+      },
+      onTimeout = () => {
+        LOG.error(s"Failed to register the worker $workerId after retrying for $timeOutSeconds " +
+          s"seconds, abort and kill the worker...")
+        self ! PoisonPill
+      })
+  }
+
+  def terminationWatch(master: ActorRef): Receive = {
     case Terminated(actor) =>
       if (actor.compareTo(master) == 0) {
-        // parent is down, let's make suicide
+        // Parent master is down, no point to keep worker anymore. Let's make suicide to free
+        // resources
         LOG.info(s"Master cannot be contacted, find a new master ...")
-        context.become(waitForMasterConfirm(repeatActionUtil(30)(masterProxy ! RegisterWorker(id))))
+        context.become(waitForMasterConfirm(retryRegisterWorker(id, timeOutSeconds = 30)))
       } else if (ActorUtil.isChildActorPath(self, actor)) {
-        //one executor is down,
+        // One executor is down,
         LOG.info(s"Executor is down ${getExecutorName(actor)}")
 
         val allocated = allocatedResources.get(actor)
@@ -248,68 +269,81 @@ private[cluster] class Worker(masterProxy : ActorRef) extends Actor with TimeOut
           resource = resource + allocated.get
           executorsInfo -= actor
           allocatedResources = allocatedResources - actor
-          sendMsgWithTimeOutCallBack(master, ResourceUpdate(self, id, resource), resourceUpdateTimeoutMs, updateResourceTimeOut())
+          sendMsgWithTimeOutCallBack(master, ResourceUpdate(self, id, resource),
+            resourceUpdateTimeoutMs, updateResourceTimeOut())
         }
       }
   }
 
-  private def getExecutorName(actorRef: ActorRef):  Option[String] = {
+  private def getExecutorName(actorRef: ActorRef): Option[String] = {
     executorNameToActor.find(_._2 == actorRef).map(_._1)
   }
 
   private def getExecutorProcLauncher(): ExecutorProcessLauncher = {
-    val launcherClazz = Class.forName(systemConfig.getString(Constants.GEARPUMP_EXECUTOR_PROCESS_LAUNCHER))
-    launcherClazz.getConstructor(classOf[Config]).newInstance(systemConfig).asInstanceOf[ExecutorProcessLauncher]
+    val launcherClazz = Class.forName(
+      systemConfig.getString(GEARPUMP_EXECUTOR_PROCESS_LAUNCHER))
+    launcherClazz.getConstructor(classOf[Config]).newInstance(systemConfig)
+      .asInstanceOf[ExecutorProcessLauncher]
   }
 
   import context.dispatcher
-  override def preStart() : Unit = {
+  override def preStart(): Unit = {
     LOG.info(s"RegisterNewWorker")
-    totalSlots = systemConfig.getInt(Constants.GEARPUMP_WORKER_SLOTS)
+    totalSlots = systemConfig.getInt(GEARPUMP_WORKER_SLOTS)
     this.resource = Resource(totalSlots)
     masterProxy ! RegisterNewWorker
-    context.become(waitForMasterConfirm(repeatActionUtil(30)(Unit)))
+    context.become(waitForMasterConfirm(registerTimeoutTicker(seconds = 30)))
   }
 
-  private def repeatActionUtil(seconds: Int)(action : => Unit) : Cancellable = {
-    val cancelSend = context.system.scheduler.schedule(Duration.Zero, Duration(2, TimeUnit.SECONDS))(action)
-    val cancelSuicide = context.system.scheduler.scheduleOnce(FiniteDuration(seconds, TimeUnit.SECONDS), self, PoisonPill)
-    return new Cancellable {
+  private def registerTimeoutTicker(seconds: Int): Cancellable = {
+    repeatActionUtil(seconds, () => Unit, () => {
+      LOG.error(s"Failed to register new worker to Master after waiting for $seconds seconds, " +
+        s"abort and kill the worker...")
+      self ! PoisonPill
+    })
+  }
+
+  private def repeatActionUtil(seconds: Int, action: () => Unit, onTimeout: () => Unit)
+    : Cancellable = {
+    val cancelTimeout = context.system.scheduler.schedule(Duration.Zero,
+      Duration(2, TimeUnit.SECONDS))(action())
+    val cancelSuicide = context.system.scheduler.scheduleOnce(seconds.seconds)(onTimeout())
+    new Cancellable {
       def cancel(): Boolean = {
-        val result1 = cancelSend.cancel()
+        val result1 = cancelTimeout.cancel()
         val result2 = cancelSuicide.cancel()
         result1 && result2
       }
 
       def isCancelled: Boolean = {
-        cancelSend.isCancelled && cancelSuicide.isCancelled
+        cancelTimeout.isCancelled && cancelSuicide.isCancelled
       }
     }
   }
 
-  override def postStop : Unit = {
+  override def postStop(): Unit = {
     LOG.info(s"Worker is going down....")
     ioPool.shutdown()
-    context.system.shutdown()
+    context.system.terminate()
   }
 }
 
 private[cluster] object Worker {
 
-  case class ExecutorResult(result : Try[Int])
+  case class ExecutorResult(result: Try[Int])
 
   class ExecutorWatcher(
-    launch: LaunchExecutor,
-    masterInfo: MasterInfo,
-    ioPool: ExecutionContext,
-    jarStoreService: JarStoreService,
-    procLauncher: ExecutorProcessLauncher) extends Actor {
+      launch: LaunchExecutor,
+      masterInfo: MasterInfo,
+      ioPool: ExecutionContext,
+      jarStoreService: JarStoreService,
+      procLauncher: ExecutorProcessLauncher) extends Actor {
     import launch.{appId, executorId, resource}
 
     val executorConfig: Config = {
       val workerConfig = context.system.settings.config
 
-      val submissionConfig = Option(launch.executorJvmConfig).flatMap{ jvmConfig =>
+      val submissionConfig = Option(launch.executorJvmConfig).flatMap { jvmConfig =>
         Option(jvmConfig.executorAkkaConfig)
       }.getOrElse(ConfigFactory.empty())
 
@@ -319,15 +353,15 @@ private[cluster] object Worker {
     // For some config, worker has priority, for others, user Application submission config
     // have priorities.
     private def resolveExecutorConfig(workerConfig: Config, submissionConfig: Config): Config = {
-      val config = submissionConfig.withoutPath(Constants.GEARPUMP_HOSTNAME)
-        .withoutPath(Constants.GEARPUMP_CLUSTER_MASTERS)
-        .withoutPath(Constants.GEARPUMP_HOME)
-        .withoutPath(Constants.GEARPUMP_LOG_DAEMON_DIR)
+      val config = submissionConfig.withoutPath(GEARPUMP_HOSTNAME)
+        .withoutPath(GEARPUMP_CLUSTER_MASTERS)
+        .withoutPath(GEARPUMP_HOME)
+        .withoutPath(GEARPUMP_LOG_DAEMON_DIR)
         .withoutPath(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS)
-        // fall back to workerConfig
+        // Falls back to workerConfig
         .withFallback(workerConfig)
 
-      // we should exclude reference.conf, and JVM properties..
+      // Excludes reference.conf, and JVM properties..
       ClusterConfig.filterOutDefaultConfig(config)
     }
 
@@ -343,10 +377,10 @@ private[cluster] object Worker {
           val exitPromise = Promise[Int]()
           val app = context.actorOf(Props(new InJvmExecutor(launch, exitPromise)))
 
-          override def destroy = {
+          override def destroy(): Unit = {
             context.stop(app)
           }
-          override def exitValue : Future[Int] = {
+          override def exitValue: Future[Int] = {
             exitPromise.future
           }
         }
@@ -376,31 +410,30 @@ private[cluster] object Worker {
           ctx.classPath.map(path => expandEnviroment(path)) ++
           jarPath.map(Array(_)).getOrElse(Array.empty[String])
 
-
-        val appLogDir = executorConfig.getString(Constants.GEARPUMP_LOG_APPLICATION_DIR)
+        val appLogDir = executorConfig.getString(GEARPUMP_LOG_APPLICATION_DIR)
         val logArgs = List(
-          s"-D${Constants.GEARPUMP_APPLICATION_ID}=${launch.appId}",
-          s"-D${Constants.GEARPUMP_EXECUTOR_ID}=${launch.executorId}",
-          s"-D${Constants.GEARPUMP_MASTER_STARTTIME}=${getFormatedTime(masterInfo.startTime)}",
-          s"-D${Constants.GEARPUMP_LOG_APPLICATION_DIR}=${appLogDir}")
-        val configArgs =List(s"-D${Constants.GEARPUMP_CUSTOM_CONFIG_FILE}=$configFile")
+          s"-D${GEARPUMP_APPLICATION_ID}=${launch.appId}",
+          s"-D${GEARPUMP_EXECUTOR_ID}=${launch.executorId}",
+          s"-D${GEARPUMP_MASTER_STARTTIME}=${getFormatedTime(masterInfo.startTime)}",
+          s"-D${GEARPUMP_LOG_APPLICATION_DIR}=${appLogDir}")
+        val configArgs = List(s"-D${GEARPUMP_CUSTOM_CONFIG_FILE}=$configFile")
 
-        val username = List(s"-D${Constants.GEARPUMP_USERNAME}=${ctx.username}")
+        val username = List(s"-D${GEARPUMP_USERNAME}=${ctx.username}")
 
-        //remote debug executor process
-        val remoteDebugFlag = executorConfig.getBoolean(Constants.GEARPUMP_REMOTE_DEBUG_EXECUTOR_JVM)
+        // Remote debug executor process
+        val remoteDebugFlag = executorConfig.getBoolean(GEARPUMP_REMOTE_DEBUG_EXECUTOR_JVM)
         val remoteDebugConfig = if (remoteDebugFlag) {
-          val availablePort = Util.findFreePort.get
+          val availablePort = Util.findFreePort().get
           List(
             "-Xdebug",
             s"-Xrunjdwp:server=y,transport=dt_socket,address=${availablePort},suspend=n",
-            s"-D${Constants.GEARPUMP_REMOTE_DEBUG_PORT}=$availablePort"
+            s"-D${GEARPUMP_REMOTE_DEBUG_PORT}=$availablePort"
           )
         } else {
           List.empty[String]
         }
 
-        val verboseGCFlag = executorConfig.getBoolean(Constants.GEARPUMP_VERBOSE_GC)
+        val verboseGCFlag = executorConfig.getBoolean(GEARPUMP_VERBOSE_GC)
         val verboseGCConfig = if (verboseGCFlag) {
           List(
             s"-Xloggc:${appLogDir}/gc-app${launch.appId}-executor-${launch.executorId}.log",
@@ -431,7 +464,7 @@ private[cluster] object Worker {
 
         var destroyed = false
 
-        override def destroy: Unit = {
+        override def destroy(): Unit = {
           LOG.info(s"Destroy executor process ${ctx.mainClass}")
           if (!destroyed) {
             destroyed = true
@@ -449,37 +482,38 @@ private[cluster] object Worker {
             if (exit == 0) {
               Future.successful(0)
             } else {
-              Future.failed[Int](new Exception(s"Executor exit with failure, exit value: $exit, error summary: ${info.process.logger.error}"))
+              Future.failed[Int](new Exception(s"Executor exit with failure, exit value: $exit, " +
+              s"error summary: ${info.process.logger.error}"))
             }
           }
         }
       }
     }
 
-    import Constants._
     private def expandEnviroment(path: String): String = {
-      //TODO: extend this to support more environment.
+      // TODO: extend this to support more environment.
       path.replace(s"<${GEARPUMP_HOME}>", executorConfig.getString(GEARPUMP_HOME))
     }
 
-    override def preStart: Unit = {
-      executorHandler.exitValue.onComplete{value =>
+    override def preStart(): Unit = {
+      executorHandler.exitValue.onComplete { value =>
         procLauncher.cleanProcess(appId, executorId)
         val result = ExecutorResult(value)
         self ! result
       }
     }
 
-    override def postStop: Unit = {
-      executorHandler.destroy
+    override def postStop(): Unit = {
+      executorHandler.destroy()
     }
 
-    //The folders are under ${GEARPUMP_HOME}
-    val daemonPathPattern = List("lib" + File.separator + "daemon", "lib" + File.separator + "yarn")
+    // The folders are under ${GEARPUMP_HOME}
+    val daemonPathPattern = List("lib" + File.separator + "daemon", "lib" +
+      File.separator + "yarn")
 
     override def receive: Receive = {
-      case ShutdownExecutor(appId, executorId, reason : String) =>
-        executorHandler.destroy
+      case ShutdownExecutor(appId, executorId, reason: String) =>
+        executorHandler.destroy()
         sender ! ShutdownExecutorSucceed(appId, executorId)
         context.stop(self)
       case ExecutorResult(executorResult) =>
@@ -506,29 +540,28 @@ private[cluster] object Worker {
   }
 
   trait ExecutorHandler {
-    def destroy : Unit
-    def exitValue : Future[Int]
+    def destroy(): Unit
+    def exitValue: Future[Int]
   }
 
   case class ProcessInfo(process: RichProcess, jarPath: Option[String], configFile: String)
 
   /**
-    * We will start the executor in  the same JVM as worker.
-    * @param launch
-    * @param exit
-    */
-  class InJvmExecutor(launch: LaunchExecutor, exit: Promise[Int]) extends Daemon(launch.executorJvmConfig.arguments(0), launch.executorJvmConfig.arguments(1)) {
+   * Starts the executor in  the same JVM as worker.
+   */
+  class InJvmExecutor(launch: LaunchExecutor, exit: Promise[Int])
+    extends Daemon(launch.executorJvmConfig.arguments(0), launch.executorJvmConfig.arguments(1)) {
     private val exitCode = 0
 
     override val supervisorStrategy =
-      OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
+      OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1.minute) {
         case ex: Throwable =>
           LOG.error(s"system $name stopped ", ex)
           exit.failure(ex)
           Stop
       }
 
-    override def postStop : Unit = {
+    override def postStop(): Unit = {
       if (!exit.isCompleted) {
         exit.success(exitCode)
       }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/jarstore/dfs/DFSJarStoreService.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/jarstore/dfs/DFSJarStoreService.scala b/daemon/src/main/scala/io/gearpump/jarstore/dfs/DFSJarStoreService.scala
index 94430ce..305bdc1 100644
--- a/daemon/src/main/scala/io/gearpump/jarstore/dfs/DFSJarStoreService.scala
+++ b/daemon/src/main/scala/io/gearpump/jarstore/dfs/DFSJarStoreService.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,16 +18,17 @@
 package io.gearpump.jarstore.dfs
 
 import java.io.File
-import akka.actor.{ActorSystem, ActorRefFactory}
+
+import akka.actor.ActorSystem
 import com.typesafe.config.Config
-import org.apache.hadoop.fs.Path
-import io.gearpump.jarstore.{FilePath, JarStoreService}
-import io.gearpump.util.LogUtil
 import org.apache.hadoop.conf.Configuration
-import io.gearpump.util.Constants
+import org.apache.hadoop.fs.Path
 import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
 import org.slf4j.Logger
 
+import io.gearpump.jarstore.{FilePath, JarStoreService}
+import io.gearpump.util.{Constants, LogUtil}
+
 /**
  * DFSJarStoreService store the uploaded jar on HDFS
  */
@@ -46,7 +47,8 @@ class DFSJarStoreService extends JarStoreService {
   }
 
   /**
-    * This function will copy the remote file to local file system, called from client side.
+   * This function will copy the remote file to local file system, called from client side.
+   *
    * @param localFile The destination of file path
    * @param remotePath The remote file path from JarStore
    */
@@ -60,6 +62,7 @@ class DFSJarStoreService extends JarStoreService {
 
   /**
    * This function will copy the local file to the remote JarStore, called from client side.
+   *
    * @param localFile The local file
    */
   override def copyFromLocal(localFile: File): FilePath = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStore.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStore.scala b/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStore.scala
index ec2104b..fa1a240 100644
--- a/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStore.scala
+++ b/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStore.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,19 +20,17 @@ package io.gearpump.jarstore.local
 
 import java.io.File
 
-import akka.actor.{Actor, Props, Stash}
-import akka.pattern.{ask, pipe}
-import io.gearpump.cluster.ClientToMaster.GetJarStoreServer
-import io.gearpump.util._
-import io.gearpump.cluster.ClientToMaster.{JarStoreServerAddress, GetJarStoreServer}
+import akka.actor.{Actor, Stash}
+import akka.pattern.pipe
 import org.slf4j.Logger
 
-import scala.concurrent.Future
+import io.gearpump.cluster.ClientToMaster.{GetJarStoreServer, JarStoreServerAddress}
+import io.gearpump.util._
 
 /**
  * LocalJarStore store the uploaded jar on local disk.
  */
-class LocalJarStore(rootDirPath : String) extends Actor with Stash {
+class LocalJarStore(rootDirPath: String) extends Actor with Stash {
   private val LOG: Logger = LogUtil.getLogger(getClass)
 
   val host = context.system.settings.config.getString(Constants.GEARPUMP_HOSTNAME)
@@ -47,7 +45,7 @@ class LocalJarStore(rootDirPath : String) extends Actor with Stash {
 
   server.start pipeTo self
 
-  def receive : Receive = {
+  def receive: Receive = {
     case FileServer.Port(port) =>
       context.become(listen(port))
       unstashAll()
@@ -55,7 +53,7 @@ class LocalJarStore(rootDirPath : String) extends Actor with Stash {
       stash()
   }
 
-  def listen(port : Int) : Receive = {
+  def listen(port: Int): Receive = {
     case GetJarStoreServer =>
       sender ! JarStoreServerAddress(s"http://$host:$port/")
   }