You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/04/26 11:42:58 UTC

[48/49] incubator-gearpump git commit: GEARPUMP-11, fix code style

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/AppDescription.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/AppDescription.scala b/core/src/main/scala/io/gearpump/cluster/AppDescription.scala
index e8af854..799c20a 100644
--- a/core/src/main/scala/io/gearpump/cluster/AppDescription.scala
+++ b/core/src/main/scala/io/gearpump/cluster/AppDescription.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,44 +18,64 @@
 
 package io.gearpump.cluster
 
+import scala.reflect.ClassTag
+
 import akka.actor.{Actor, ActorRef, ActorSystem}
 import com.typesafe.config.{Config, ConfigFactory}
+
 import io.gearpump.cluster.appmaster.WorkerInfo
 import io.gearpump.cluster.scheduler.Resource
 import io.gearpump.jarstore.FilePath
-import io.gearpump.util.Util
-
-import scala.reflect.ClassTag
 
 /**
  * This contains all information to run an application
  *
- * @param name: The name of this application
- * @param appMaster: The class name of AppMaster Actor
- * @param userConfig: user configuration.
- * @param clusterConfig: The user provided cluster config, it will override gear.conf when starting
- *  new applications. In most cases, you wouldnot need to change it. If you do need to change it,
- *  use ClusterConfigSource(filePath) to construct the object, while filePath points to the .conf file.
+ * @param name The name of this application
+ * @param appMaster The class name of AppMaster Actor
+ * @param userConfig user configuration.
+ * @param clusterConfig User provided cluster config, it overrides gear.conf when starting
+ *                      new applications. In most cases, you should not need to change it. If you do
+ *                      really need to change it, please use ClusterConfigSource(filePath) to
+ *                      construct the object, while filePath points to the .conf file.
  */
+case class AppDescription(
+    name: String, appMaster: String, userConfig: UserConfig,
+    clusterConfig: Config = ConfigFactory.empty())
 
-case class AppDescription(name : String, appMaster : String, userConfig: UserConfig, clusterConfig: Config = ConfigFactory.empty())
-
+/**
+ * Each job, streaming or not streaming, need to provide an Application class.
+ * The master uses this class to start AppMaster.
+ */
 trait Application {
+
+  /** Name of this application, must be unique in the system */
   def name: String
+
+  /** Custom user configuration  */
   def userConfig(implicit system: ActorSystem): UserConfig
+
+  /**
+   * AppMaster class, must have a constructor like this:
+   * this(appContext: AppMasterContext, app: AppDescription)
+   */
   def appMaster: Class[_ <: ApplicationMaster]
 }
 
 object Application {
-  def apply[T <: ApplicationMaster](name: String, userConfig: UserConfig)(implicit tag: ClassTag[T]): Application = {
-    new DefaultApplication(name, userConfig, tag.runtimeClass.asInstanceOf[Class[_ <: ApplicationMaster]])
+  def apply[T <: ApplicationMaster](
+      name: String, userConfig: UserConfig)(implicit tag: ClassTag[T]): Application = {
+    new DefaultApplication(name, userConfig,
+      tag.runtimeClass.asInstanceOf[Class[_ <: ApplicationMaster]])
   }
 
-  class DefaultApplication(override val name: String, inputUserConfig: UserConfig, val appMaster: Class[_ <: ApplicationMaster]) extends Application {
+  class DefaultApplication(
+      override val name: String, inputUserConfig: UserConfig,
+      val appMaster: Class[_ <: ApplicationMaster]) extends Application {
     override def userConfig(implicit system: ActorSystem): UserConfig = inputUserConfig
   }
 
-  def ApplicationToAppDescription(app: Application)(implicit system: ActorSystem): AppDescription = {
+  def ApplicationToAppDescription(app: Application)(implicit system: ActorSystem)
+    : AppDescription = {
     val filterJvmReservedKeys = ClusterConfig.filterOutDefaultConfig(system.settings.config)
     AppDescription(app.name, app.appMaster.getName, app.userConfig, filterJvmReservedKeys)
   }
@@ -69,57 +89,57 @@ abstract class ApplicationMaster extends Actor
 /**
  * This contains context information when starting an AppMaster
  *
- * @param appId: application instance id assigned, it is unique in the cluster
- * @param username: The username who submitted this application
- * @param resource: Resouce allocated to start this AppMaster daemon. AppMaster are allowed to
- *                request more resource from Master.
- * @param appJar: application Jar. If the jar is already in classpath, then it can be None.
- * @param masterProxy: The proxy to master actor, it will bridge the messages between appmaster and master
- * @param registerData: The AppMaster are required to register this data back to Master by RegisterAppMaster
- *
+ * @param appId application instance id assigned, it is unique in the cluster
+ * @param username The username who submitted this application
+ * @param resource Resouce allocated to start this AppMaster daemon. AppMaster are allowed to
+ *                 request more resource from Master.
+ * @param appJar application Jar. If the jar is already in classpath, then it can be None.
+ * @param masterProxy The proxy to master actor, it bridges the messages between appmaster
+ *                    and master
+ * @param registerData AppMaster are required to send this data to Master by when doing
+ *                     RegisterAppMaster.
  */
 case class AppMasterContext(
-    appId : Int,
-    username : String,
-    resource : Resource,
+    appId: Int,
+    username: String,
+    resource: Resource,
     workerInfo: WorkerInfo,
-    appJar : Option[AppJar],
-    masterProxy : ActorRef,
-    registerData : AppMasterRegisterData)
+    appJar: Option[AppJar],
+    masterProxy: ActorRef,
+    registerData: AppMasterRegisterData)
 
 /**
  * Jar file container in the cluster
  *
- * @param name: A meaningful name to represent this jar
- * @param filePath: Where the jar file is stored.
+ * @param name A meaningful name to represent this jar
+ * @param filePath Where the jar file is stored.
  */
 case class AppJar(name: String, filePath: FilePath)
 
-
 /**
- * TODO: ExecutorContext doesn't belong here.
- * Need to move to other places
+ * Serves as the context to start an Executor JVM.
  */
-case class ExecutorContext(executorId : Int, worker: WorkerInfo, appId : Int, appName: String,
-                           appMaster : ActorRef, resource : Resource)
-
+// TODO: ExecutorContext doesn't belong to this package in logic.
+case class ExecutorContext(
+    executorId: Int, worker: WorkerInfo, appId: Int, appName: String,
+    appMaster: ActorRef, resource: Resource)
 
 /**
- * TODO: ExecutorJVMConfig doesn't belong here.
- * Need to move to other places
- */
-/**
- * @param classPath: When a worker create a executor, the parent worker's classpath will
- * be automatically inherited, the application jar will also be added to runtime
- * classpath automatically. Sometimes, you still want to add some extraclasspath,
- * you can do this by specify classPath option.
+ * JVM configurations to start an Executor JVM.
+ *
+ * @param classPath When executor is created by a worker JVM, executor automatically inherits
+ *                  parent worker's classpath. Sometimes, you still want to add some extra
+ *                  classpath, you can do this by specify classPath option.
  * @param jvmArguments java arguments like -Dxx=yy
  * @param mainClass Executor main class name like io.gearpump.xx.AppMaster
  * @param arguments Executor command line arguments
  * @param jar application jar
- * @param executorAkkaConfig Akka config used to initialize the actor system of this executor. It will
- * use io.gearpump.util.Constants.GEARPUMP_CUSTOM_CONFIG_FILE to pass the config to executor
- * process
- *
+ * @param executorAkkaConfig Akka config used to initialize the actor system of this executor.
+ *                           It uses io.gearpump.util.Constants.GEARPUMP_CUSTOM_CONFIG_FILE
+ *                           to pass the config to executor process
  */
-case class ExecutorJVMConfig(classPath : Array[String], jvmArguments : Array[String], mainClass : String, arguments : Array[String], jar: Option[AppJar], username : String, executorAkkaConfig: Config = ConfigFactory.empty())
\ No newline at end of file
+// TODO: ExecutorContext doesn't belong to this package in logic.
+case class ExecutorJVMConfig(
+    classPath: Array[String], jvmArguments: Array[String], mainClass: String,
+    arguments: Array[String], jar: Option[AppJar], username: String,
+    executorAkkaConfig: Config = ConfigFactory.empty())
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/ClusterConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/ClusterConfig.scala b/core/src/main/scala/io/gearpump/cluster/ClusterConfig.scala
index 64d5c42..7bae6d6 100644
--- a/core/src/main/scala/io/gearpump/cluster/ClusterConfig.scala
+++ b/core/src/main/scala/io/gearpump/cluster/ClusterConfig.scala
@@ -19,24 +19,27 @@
 package io.gearpump.cluster
 
 import java.io.File
+
 import com.typesafe.config._
+
 import io.gearpump.util.Constants._
-import io.gearpump.util.{Util, FileUtils, Constants, LogUtil}
-import scala.collection.JavaConversions._
+import io.gearpump.util.{Constants, FileUtils, LogUtil, Util}
 
 /**
  *
  * All Gearpump application should use this class to load configurations.
  *
- * Compared with Akka built-in [[ConfigFactory]], this class will also
- * resolve file gear.conf and geardefault.conf.
+ * Compared with Akka built-in com.typesafe.config.ConfigFactory, this class also
+ * resolve config from file gear.conf and geardefault.conf.
  *
  * Overriding order:
+ * {{{
  *   System Properties
  *     > Custom configuration file (by using system property -Dgearpump.config.file) >
  *     > gear.conf
  *     > geardefault.conf
  *     > reference.conf
+ * }}}
  */
 
 object ClusterConfig {
@@ -81,11 +84,10 @@ object ClusterConfig {
     load(configFile).ui
   }
 
-
   /**
    * try to load system property gearpump.config.file, or use configFile
    */
-  private def load(configFile: String) : Configs = {
+  private def load(configFile: String): Configs = {
     val file = Option(System.getProperty(GEARPUMP_CUSTOM_CONFIG_FILE))
     file match {
       case Some(path) =>
@@ -100,7 +102,7 @@ object ClusterConfig {
   val APPLICATION = "application.conf"
   val LOG = LogUtil.getLogger(getClass)
 
-  def saveConfig(conf : Config, file : File) : Unit = {
+  def saveConfig(conf: Config, file: File): Unit = {
     val serialized = conf.root().render()
     FileUtils.write(file, serialized)
   }
@@ -113,13 +115,13 @@ object ClusterConfig {
     }
   }
 
-  // filter JVM reserved keys and akka default reference.conf
+  /** filter JVM reserved keys and akka default reference.conf */
   def filterOutDefaultConfig(input: Config): Config = {
     val updated = filterOutJvmReservedKeys(input)
     Util.filterOutOrigin(updated, "reference.conf")
   }
 
-  private[gearpump] def load(source: ClusterConfigSource) : Configs = {
+  private[gearpump] def load(source: ClusterConfigSource): Configs = {
 
     val systemProperties = getSystemProperties
 
@@ -162,8 +164,8 @@ object ClusterConfig {
   )
 
   private def getSystemProperties: Config = {
-    // exclude default java system properties
-    JVM_RESERVED_PROPERTIES.foldLeft(ConfigFactory.systemProperties()) {(config, property) =>
+    // Excludes default java system properties
+    JVM_RESERVED_PROPERTIES.foldLeft(ConfigFactory.systemProperties()) { (config, property) =>
       config.withoutPath(property)
     }
   }
@@ -177,5 +179,6 @@ object ClusterConfig {
     filterJvmReservedKeys
   }
 
-  protected class Configs (val master: Config, val worker: Config, val ui: Config, val default: Config)
+  protected class Configs(
+      val master: Config, val worker: Config, val ui: Config, val default: Config)
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/ClusterConfigSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/ClusterConfigSource.scala b/core/src/main/scala/io/gearpump/cluster/ClusterConfigSource.scala
index 7e8a91d..3a248d7 100644
--- a/core/src/main/scala/io/gearpump/cluster/ClusterConfigSource.scala
+++ b/core/src/main/scala/io/gearpump/cluster/ClusterConfigSource.scala
@@ -19,9 +19,10 @@
 package io.gearpump.cluster
 
 import java.io.File
-import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
 import scala.language.implicitConversions
 
+import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
+
 /**
  * Data Source of ClusterConfig
  *

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala b/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala
index aac45ab..5a42ea3 100644
--- a/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala
+++ b/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,26 +18,25 @@
 
 package io.gearpump.cluster
 
+import scala.util.Try
+
 import akka.actor.ActorRef
 import com.typesafe.config.Config
-import io.gearpump.{WorkerId, TimeStamp}
+
+import io.gearpump.TimeStamp
 import io.gearpump.cluster.MasterToAppMaster.AppMasterStatus
 import io.gearpump.cluster.master.MasterSummary
 import io.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest}
-import io.gearpump.cluster.worker.WorkerSummary
+import io.gearpump.cluster.worker.{WorkerId, WorkerSummary}
 import io.gearpump.metrics.Metrics.MetricType
 
-import scala.util.Try
-
-/**
- * Application Flow
- */
-
 object ClientToMaster {
   case object AddMaster
   case class AddWorker(count: Int)
   case class RemoveMaster(masterContainerId: String)
   case class RemoveWorker(workerContainerId: String)
+
+  /** Command result of AddMaster, RemoveMaster, and etc... */
   case class CommandResult(success: Boolean, exception: String = null) {
     override def toString: String = {
       val tag = getClass.getSimpleName
@@ -48,50 +47,89 @@ object ClientToMaster {
       }
     }
   }
-  case class SubmitApplication(appDescription: AppDescription, appJar: Option[AppJar], username : String = System.getProperty("user.name"))
+
+  /** Submit an application to master */
+  case class SubmitApplication(
+      appDescription: AppDescription, appJar: Option[AppJar],
+      username: String = System.getProperty("user.name"))
+
   case class RestartApplication(appId: Int)
   case class ShutdownApplication(appId: Int)
+
+  /** Client send ResolveAppId to Master to resolves AppMaster actor path by providing appId */
   case class ResolveAppId(appId: Int)
 
+  /** Client send ResolveWorkerId to master to get the Actor path of worker. */
   case class ResolveWorkerId(workerId: WorkerId)
 
+  /** Get an active Jar store to upload job jars, like wordcount.jar */
   case object GetJarStoreServer
 
+  /** Service address of JarStore */
   case class JarStoreServerAddress(url: String)
 
+  /** Query AppMaster config by providing appId */
   case class QueryAppMasterConfig(appId: Int)
 
+  /** Query worker config */
   case class QueryWorkerConfig(workerId: WorkerId)
 
+  /** Query master config */
   case object QueryMasterConfig
 
+  /** Options for read the metrics from the cluster */
   object ReadOption {
     type ReadOption = String
 
     val Key: String = "readOption"
 
+    /** Read the latest record of the metrics, only return 1 record for one metric name (id) */
     val ReadLatest: ReadOption = "readLatest"
 
+    /** Read recent metrics from cluster, typically it contains metrics in 5 minutes */
     val ReadRecent = "readRecent"
 
+    /**
+     * Read the history metrics, typically it contains metrics for 48 hours
+     *
+     * NOTE: Each hour only contain one or two data points.
+     */
     val ReadHistory = "readHistory"
   }
 
+  /** Query history metrics from master or app master. */
+  case class QueryHistoryMetrics(
+      path: String, readOption: ReadOption.ReadOption = ReadOption.ReadLatest,
+      aggregatorClazz: String = "", options: Map[String, String] = Map.empty[String, String])
 
-  case class QueryHistoryMetrics(path: String, readOption: ReadOption.ReadOption = ReadOption.ReadLatest, aggregatorClazz: String = "", options: Map[String, String] = Map.empty[String, String])
-
+  /**
+   * If there are message loss, the clock would pause for a while. This message is used to
+   * pin-point which task has stalling clock value, and usually it means something wrong on
+   * that machine.
+   */
   case class GetStallingTasks(appId: Int)
 
+  /**
+   * Request app master for a short list of cluster app that administrators should be aware of.
+   */
   case class GetLastFailure(appId: Int)
 }
 
 object MasterToClient {
-  case class SubmitApplicationResult(appId : Try[Int])
+
+  /** Result of SubmitApplication */
+  // TODO: Merge with SubmitApplicationResultValue and change this to (appId: Option, ex: Exception)
+  case class SubmitApplicationResult(appId: Try[Int])
+
   case class SubmitApplicationResultValue(appId: Int)
-  case class ShutdownApplicationResult(appId : Try[Int])
+
+  case class ShutdownApplicationResult(appId: Try[Int])
   case class ReplayApplicationResult(appId: Try[Int])
+
+  /** Return Actor ref of app master */
   case class ResolveAppIdResult(appMaster: Try[ActorRef])
 
+  /** Return Actor ref of worker */
   case class ResolveWorkerIdResult(worker: Try[ActorRef])
 
   case class AppMasterConfig(config: Config)
@@ -102,25 +140,64 @@ object MasterToClient {
 
   case class HistoryMetricsItem(time: TimeStamp, value: MetricType)
 
+  /**
+   * History metrics returned from master, worker, or app master.
+   *
+   * All metric items are organized like a tree, path is used to navigate through the tree.
+   * For example, when querying with path == "executor0.task1.throughput*", the metrics
+   * provider picks metrics whose source matches the path.
+   *
+   * @param path The path client provided. The returned metrics are the result query of this path.
+   * @param metrics The detailed metrics.
+   */
   case class HistoryMetrics(path: String, metrics: List[HistoryMetricsItem])
 
+  /** Return the last error of this streaming application job */
   case class LastFailure(time: TimeStamp, error: String)
 }
 
 trait AppMasterRegisterData
 
 object AppMasterToMaster {
-  case class RegisterAppMaster(appMaster: ActorRef, registerData : AppMasterRegisterData)
+
+  /**
+   * Register an AppMaster by providing a ActorRef, and registerData
+   * @param registerData The registerData is provided by Master when starting the app master.
+   *                     App master should return the registerData back to master.
+   *                     Typically registerData hold some context information for this app Master.
+   */
+
+  case class RegisterAppMaster(appMaster: ActorRef, registerData: AppMasterRegisterData)
+
   case class InvalidAppMaster(appId: Int, appMaster: String, reason: Throwable)
+
   case class RequestResource(appId: Int, request: ResourceRequest)
 
+  /**
+   * Each application job can save some data in the distributed cluster storage on master nodes.
+   *
+   * @param appId App Id of the client application who send the request.
+   * @param key Key name
+   * @param value Value to store on distributed cluster storage on master nodes
+   */
   case class SaveAppData(appId: Int, key: String, value: Any)
+
+  /** The application specific data is successfully stored */
   case object AppDataSaved
+
+  /** Fail to store the application data */
   case object SaveAppDataFailed
 
+  /** Fetch the application specific data that stored previously */
   case class GetAppData(appId: Int, key: String)
+
+  /** The KV data returned for query GetAppData */
   case class GetAppDataResult(key: String, value: Any)
 
+  /**
+   * AppMasterSummary returned to REST API query. Streaming and Non-streaming
+   * have very different application info. AppMasterSummary is the common interface.
+   */
   trait AppMasterSummary {
     def appType: String
     def appId: Int
@@ -132,37 +209,43 @@ object AppMasterToMaster {
     def user: String
   }
 
+  /** Represents a generic application that is not a streaming job */
   case class GeneralAppMasterSummary(
-    appId: Int,
-    appType: String = "general",
-    appName: String = null,
-    actorPath: String = null,
-    status: AppMasterStatus = MasterToAppMaster.AppMasterActive,
-    startTime: TimeStamp = 0L,
-    uptime: TimeStamp = 0L,
-    user: String = null)
+      appId: Int,
+      appType: String = "general",
+      appName: String = null,
+      actorPath: String = null,
+      status: AppMasterStatus = MasterToAppMaster.AppMasterActive,
+      startTime: TimeStamp = 0L,
+      uptime: TimeStamp = 0L,
+      user: String = null)
     extends AppMasterSummary
 
+  /** Fetches the list of workers from Master */
   case object GetAllWorkers
+
+  /** Get worker data of workerId */
   case class GetWorkerData(workerId: WorkerId)
+
+  /** Response to GetWorkerData */
   case class WorkerData(workerDescription: WorkerSummary)
 
+  /** Get Master data */
   case object GetMasterData
+
+  /** Response to GetMasterData */
   case class MasterData(masterDescription: MasterSummary)
 }
 
 object MasterToAppMaster {
-  case class ResourceAllocated(allocations: Array[ResourceAllocation]){
-    override def equals(other: Any): Boolean = {
-      other match {
-        case that: ResourceAllocated =>
-          allocations.sortBy(_.workerId).sameElements(that.allocations.sortBy(_.workerId))
-        case _ =>
-          false
-      }
-    }
-  }
+
+  /** Resource allocated for application xx */
+  case class ResourceAllocated(allocations: Array[ResourceAllocation])
+
+  /** Master confirm reception of RegisterAppMaster message */
   case class AppMasterRegistered(appId: Int)
+
+  /** Shutdown the application job */
   case object ShutdownAppMaster
 
   type AppMasterStatus = String
@@ -171,7 +254,11 @@ object MasterToAppMaster {
   val AppMasterNonExist: AppMasterStatus = "nonexist"
 
   sealed trait StreamingType
-  case class AppMasterData(status: AppMasterStatus, appId: Int = 0, appName: String = null, appMasterPath: String = null, workerPath: String = null, submissionTime: TimeStamp = 0, startTime: TimeStamp = 0, finishTime: TimeStamp = 0, user: String = null)
+  case class AppMasterData(
+      status: AppMasterStatus, appId: Int = 0, appName: String = null, appMasterPath: String = null,
+      workerPath: String = null, submissionTime: TimeStamp = 0, startTime: TimeStamp = 0,
+      finishTime: TimeStamp = 0, user: String = null)
+
   case class AppMasterDataRequest(appId: Int, detail: Boolean = false)
 
   case class AppMastersData(appMasters: List[AppMasterData])
@@ -185,8 +272,10 @@ object MasterToAppMaster {
 }
 
 object AppMasterToWorker {
-  case class LaunchExecutor(appId: Int, executorId: Int, resource: Resource, executorJvmConfig: ExecutorJVMConfig)
-  case class ShutdownExecutor(appId : Int, executorId : Int, reason : String)
+  case class LaunchExecutor(
+      appId: Int, executorId: Int, resource: Resource, executorJvmConfig: ExecutorJVMConfig)
+
+  case class ShutdownExecutor(appId: Int, executorId: Int, reason: String)
   case class ChangeExecutorResource(appId: Int, executorId: Int, resource: Resource)
 }
 
@@ -196,4 +285,3 @@ object WorkerToAppMaster {
   case class ShutdownExecutorFailed(reason: String = null, ex: Throwable = null)
 }
 
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/UserConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/UserConfig.scala b/core/src/main/scala/io/gearpump/cluster/UserConfig.scala
index 0756d54..61de1dd 100644
--- a/core/src/main/scala/io/gearpump/cluster/UserConfig.scala
+++ b/core/src/main/scala/io/gearpump/cluster/UserConfig.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,14 +18,15 @@
 
 package io.gearpump.cluster
 
-import akka.actor.{ExtendedActorSystem, ActorSystem}
+import akka.actor.{ActorSystem, ExtendedActorSystem}
 import akka.serialization.JavaSerializer
+
 import io.gearpump.google.common.io.BaseEncoding
 
 /**
  * Immutable configuration
  */
-final class UserConfig(private val _config: Map[String, String])  extends Serializable{
+final class UserConfig(private val _config: Map[String, String]) extends Serializable {
 
   def withBoolean(key: String, value: Boolean): UserConfig = {
     new UserConfig(_config + (key -> value.toString))
@@ -39,7 +40,7 @@ final class UserConfig(private val _config: Map[String, String])  extends Serial
     new UserConfig(_config + (key -> value.toString))
   }
 
-  def withInt(key: String, value: Int) : UserConfig = {
+  def withInt(key: String, value: Int): UserConfig = {
     new UserConfig(_config + (key -> value.toString))
   }
 
@@ -77,7 +78,7 @@ final class UserConfig(private val _config: Map[String, String])  extends Serial
     _config.get(key).map(_.toFloat)
   }
 
-  def getInt(key : String) : Option[Int] = {
+  def getInt(key: String): Option[Int] = {
     _config.get(key).map(_.toInt)
   }
 
@@ -85,11 +86,11 @@ final class UserConfig(private val _config: Map[String, String])  extends Serial
     _config.get(key).map(_.toLong)
   }
 
-  def getString(key : String) : Option[String] = {
+  def getString(key: String): Option[String] = {
     _config.get(key)
   }
 
-  def getBytes(key: String) : Option[Array[Byte]] = {
+  def getBytes(key: String): Option[Array[Byte]] = {
     _config.get(key).map(BaseEncoding.base64().decode(_))
   }
 
@@ -101,31 +102,35 @@ final class UserConfig(private val _config: Map[String, String])  extends Serial
     }
   }
 
+  // scalastyle:off line.size.limit
   /**
-   * This will de-serialize value to object instance
+   * This de-serializes value to object instance
    *
    * To do de-serialization, this requires an implicit ActorSystem, as
    * the ActorRef and possibly other akka classes deserialization
    * requires an implicit ActorSystem.
    *
-   * @see [[http://doc.akka.io/docs/akka/snapshot/scala/serialization.html#A_Word_About_Java_Serialization]]
+   * See Link:
+   * http://doc.akka.io/docs/akka/snapshot/scala/serialization.html#A_Word_About_Java_Serialization
    */
-  def getValue[T](key: String)(implicit system: ActorSystem): Option[T]  = {
+
+  def getValue[T](key: String)(implicit system: ActorSystem): Option[T] = {
     val serializer = new JavaSerializer(system.asInstanceOf[ExtendedActorSystem])
     _config.get(key).map(BaseEncoding.base64().decode(_))
       .map(serializer.fromBinary(_).asInstanceOf[T])
   }
 
   /**
-   * This will serialize the object and store it as string.
+   * This serializes the object and store it as string.
    *
    * To do serialization, this requires an implicit ActorSystem, as
    * the ActorRef and possibly other akka classes serialization
    * requires an implicit ActorSystem.
    *
-   * @see [[http://doc.akka.io/docs/akka/snapshot/scala/serialization.html#A_Word_About_Java_Serialization]]
+   * See Link:
+   * http://doc.akka.io/docs/akka/snapshot/scala/serialization.html#A_Word_About_Java_Serialization
    */
-  def withValue[T<: AnyRef](key: String, value: T)(implicit system: ActorSystem): UserConfig = {
+  def withValue[T <: AnyRef](key: String, value: T)(implicit system: ActorSystem): UserConfig = {
 
     if (null == value) {
       this
@@ -136,8 +141,9 @@ final class UserConfig(private val _config: Map[String, String])  extends Serial
       this.withString(key, encoded)
     }
   }
+  // scalastyle:on line.size.limit
 
-  def withConfig(other: UserConfig) = {
+  def withConfig(other: UserConfig): UserConfig = {
     if (null == other) {
       this
     } else {
@@ -146,11 +152,11 @@ final class UserConfig(private val _config: Map[String, String])  extends Serial
   }
 }
 
-object UserConfig{
+object UserConfig {
 
-  def empty = new UserConfig(Map.empty[String, String])
+  def empty: UserConfig = new UserConfig(Map.empty[String, String])
 
-  def apply(config : Map[String, String]) = new UserConfig(config)
+  def apply(config: Map[String, String]): UserConfig = new UserConfig(config)
 
   def unapply(config: UserConfig): Option[Map[String, String]] = Option(config._config)
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala
index 1e34678..4e8582b 100644
--- a/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala
+++ b/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala
@@ -19,32 +19,34 @@
 package io.gearpump.cluster.appmaster
 
 import akka.actor._
+
 import io.gearpump.cluster.AppMasterToMaster.RegisterAppMaster
 import io.gearpump.cluster.appmaster.AppMasterRuntimeEnvironment._
 import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.{Session, StartExecutorSystems}
 import io.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus._
 import io.gearpump.cluster.master.MasterProxy
-import io.gearpump.cluster.{AppMasterContext, AppDescription}
+import io.gearpump.cluster.{AppDescription, AppMasterContext}
 import io.gearpump.util.LogUtil
 
 /**
  * This serves as runtime environment for AppMaster.
- * When starting an AppMaster, we need to setup the connection to master, and prepare other environemnts.
+ * When starting an AppMaster, we need to setup the connection to master,
+ * and prepare other environments.
  *
  * This also extend the function of Master, by providing a scheduler service for Executor System.
  * AppMaster can ask Master for executor system directly. details like requesting resource,
- * contacting worker to start a process, and then starting an executor system is hidden from AppMaster.
+ * contacting worker to start a process, and then starting an executor system is hidden from
+ * AppMaster.
  *
  * Please use AppMasterRuntimeEnvironment.props() to construct this actor.
- *
  */
 private[appmaster]
-class AppMasterRuntimeEnvironment (
+class AppMasterRuntimeEnvironment(
     appContextInput: AppMasterContext,
     app: AppDescription,
     masters: Iterable[ActorPath],
     masterFactory: (AppId, MasterActorRef) => Props,
-    appMasterFactory: (AppMasterContext, AppDescription)=> Props,
+    appMasterFactory: (AppMasterContext, AppDescription) => Props,
     masterConnectionKeeperFactory: (MasterActorRef, RegisterAppMaster, ListenerActorRef) => Props)
   extends Actor {
 
@@ -52,15 +54,18 @@ class AppMasterRuntimeEnvironment (
   private val LOG = LogUtil.getLogger(getClass, app = appId)
 
   import scala.concurrent.duration._
-  private val master = context.actorOf(masterFactory(appId, context.actorOf(Props(new MasterProxy(masters, 30 seconds)))))
+
+  private val master = context.actorOf(
+    masterFactory(appId, context.actorOf(Props(new MasterProxy(masters, 30.seconds)))))
   private val appContext = appContextInput.copy(masterProxy = master)
 
-  //create appMaster proxy to receive command and forward to appmaster
+  // Create appMaster proxy to receive command and forward to appmaster
   private val appMaster = context.actorOf(appMasterFactory(appContext, app))
   context.watch(appMaster)
 
   private val registerAppMaster = RegisterAppMaster(appMaster, appContext.registerData)
-  private val masterConnectionKeeper =  context.actorOf(masterConnectionKeeperFactory(master, registerAppMaster, self))
+  private val masterConnectionKeeper = context.actorOf(
+    masterConnectionKeeperFactory(master, registerAppMaster, self))
   context.watch(masterConnectionKeeper)
 
   def receive: Receive = {
@@ -72,20 +77,21 @@ class AppMasterRuntimeEnvironment (
       context.stop(self)
     case Terminated(actor) => actor match {
       case `appMaster` =>
-        LOG.error (s"AppMaster ${appId} is stopped, shutdown myself")
-        context.stop (self)
+        LOG.error(s"AppMaster ${appId} is stopped, shutdown myself")
+        context.stop(self)
       case `masterConnectionKeeper` =>
-        LOG.error (s"Master connection keeper is stopped, appId: ${appId}, shutdown myself")
-        context.stop (self)
-      case _ => //skip
+        LOG.error(s"Master connection keeper is stopped, appId: ${appId}, shutdown myself")
+        context.stop(self)
+      case _ => // Skip
     }
   }
 }
 
 object AppMasterRuntimeEnvironment {
 
-
-  def props(masters: Iterable[ActorPath], app : AppDescription, appContextInput: AppMasterContext): Props = {
+  def props(
+      masters: Iterable[ActorPath], app: AppDescription, appContextInput: AppMasterContext)
+    : Props = {
 
     val master = (appId: AppId, masterProxy: MasterActorRef) =>
       MasterWithExecutorSystemProvider.props(appId, masterProxy)
@@ -93,26 +99,25 @@ object AppMasterRuntimeEnvironment {
     val appMaster = (appContext: AppMasterContext, app: AppDescription) =>
       LazyStartAppMaster.props(appContext, app)
 
-    val masterConnectionKeeper =
-      (master: MasterActorRef, registerAppMaster: RegisterAppMaster, listener: ListenerActorRef) =>
-      Props(new MasterConnectionKeeper(registerAppMaster, master, masterStatusListener = listener))
+    val masterConnectionKeeper = (master: MasterActorRef, registerAppMaster:
+      RegisterAppMaster, listener: ListenerActorRef) => Props(new MasterConnectionKeeper(
+        registerAppMaster, master, masterStatusListener = listener))
 
     Props(new AppMasterRuntimeEnvironment(
       appContextInput, app, masters, master, appMaster, masterConnectionKeeper))
   }
 
   /**
-   * This behavior as AppMaster, and will lazy start the real AppMaster. When real AppMaster
-   * is not started yet, all messages will be stashed. The stashed messages will be forwarded to
-   * real AppMaster when it is started.
+   * This behavior like a AppMaster. Under the hood, It start start the real AppMaster in a lazy
+   * way. When real AppMaster is not started yet, all messages are stashed. The stashed
+   * messages are forwarded to real AppMaster when the real AppMaster is started.
    *
    * Please use LazyStartAppMaster.props to construct this actor
    *
-   * @param appId
    * @param appMasterProps  underlying AppMaster Props
    */
   private[appmaster]
-  class LazyStartAppMaster (appId: Int, appMasterProps: Props) extends Actor with Stash {
+  class LazyStartAppMaster(appId: Int, appMasterProps: Props) extends Actor with Stash {
 
     private val LOG = LogUtil.getLogger(getClass, app = appId)
 
@@ -151,14 +156,11 @@ object AppMasterRuntimeEnvironment {
 
   private[appmaster] case object StartAppMaster
 
-
   /**
    * This enhance Master by providing new service: StartExecutorSystems
    *
-   * * Please use MasterWithExecutorSystemProvider.props to construct this actor
+   * Please use MasterWithExecutorSystemProvider.props to construct this actor
    *
-   * @param master
-   * @param executorSystemProviderProps
    */
   private[appmaster]
   class MasterWithExecutorSystemProvider(master: ActorRef, executorSystemProviderProps: Props)
@@ -168,7 +170,7 @@ object AppMasterRuntimeEnvironment {
 
     override def receive: Receive = {
       case request: StartExecutorSystems =>
-        executorSystemProvider forward  request
+        executorSystemProvider forward request
       case msg =>
         master forward msg
     }
@@ -181,13 +183,12 @@ object AppMasterRuntimeEnvironment {
       val executorSystemLauncher = (appId: Int, session: Session) =>
         Props(new ExecutorSystemLauncher(appId, session))
 
-      val scheduler = Props(new ExecutorSystemScheduler(appId, master,  executorSystemLauncher))
+      val scheduler = Props(new ExecutorSystemScheduler(appId, master, executorSystemLauncher))
 
       Props(new MasterWithExecutorSystemProvider(master, scheduler))
     }
   }
 
-
   private[appmaster] type AppId = Int
   private[appmaster] type MasterActorRef = ActorRef
   private[appmaster] type ListenerActorRef = ActorRef

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala
index 11414cf..5b3e0c5 100644
--- a/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala
+++ b/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,21 +15,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package io.gearpump.cluster.appmaster
 
 import akka.actor.ActorRef
 import com.typesafe.config.Config
+
 import io.gearpump._
 import io.gearpump.cluster.AppMasterRegisterData
 
+/** Run time info used to start an AppMaster */
 case class AppMasterRuntimeInfo(
-     appId: Int,
-     // appName is the unique Id for an application
-     appName: String,
-     worker : ActorRef = null,
-     user: String = null,
-     submissionTime: TimeStamp = 0,
-     startTime: TimeStamp = 0,
-     finishTime: TimeStamp = 0,
-     config: Config = null)
+    appId: Int,
+    // AppName is the unique Id for an application
+    appName: String,
+    worker: ActorRef = null,
+    user: String = null,
+    submissionTime: TimeStamp = 0,
+    startTime: TimeStamp = 0,
+    finishTime: TimeStamp = 0,
+    config: Config = null)
   extends AppMasterRegisterData

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/appmaster/ApplicationState.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/ApplicationState.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/ApplicationState.scala
index 430c958..3b967f4 100644
--- a/core/src/main/scala/io/gearpump/cluster/appmaster/ApplicationState.scala
+++ b/core/src/main/scala/io/gearpump/cluster/appmaster/ApplicationState.scala
@@ -18,28 +18,30 @@
 
 package io.gearpump.cluster.appmaster
 
-import io.gearpump.cluster.{AppJar, AppDescription}
+import io.gearpump.cluster.{AppDescription, AppJar}
 
 /**
-  * This state will be persisted across the masters.
-  */
-case class ApplicationState(appId : Int, appName: String, attemptId : Int, app : AppDescription, jar: Option[AppJar], username : String, state : Any) extends Serializable {
+ * This state for single application, it is be distributed across the masters.
+ */
+case class ApplicationState(
+    appId: Int, appName: String, attemptId: Int, app: AppDescription, jar: Option[AppJar],
+    username: String, state: Any) extends Serializable {
 
-   override def equals(other: Any): Boolean = {
-     other match {
-       case that: ApplicationState =>
-         if (appId == that.appId && attemptId == that.attemptId) {
-           true
-         } else {
-           false
-         }
-       case _ =>
-         false
-     }
-   }
+  override def equals(other: Any): Boolean = {
+    other match {
+      case that: ApplicationState =>
+        if (appId == that.appId && attemptId == that.attemptId) {
+          true
+        } else {
+          false
+        }
+      case _ =>
+        false
+    }
+  }
 
-   override def hashCode: Int = {
-     import akka.routing.MurmurHash._
-     extendHash(appId, attemptId, startMagicA, startMagicB)
-   }
- }
+  override def hashCode: Int = {
+    import akka.routing.MurmurHash._
+    extendHash(appId, attemptId, startMagicA, startMagicB)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala
index 400b61c..6fcb5e7 100644
--- a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala
+++ b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala
@@ -19,22 +19,25 @@
 package io.gearpump.cluster.appmaster
 
 import akka.actor.{ActorRef, Address, PoisonPill}
-import io.gearpump.WorkerId
+
 import io.gearpump.cluster.scheduler.Resource
+import io.gearpump.cluster.worker.WorkerId
 import io.gearpump.util.ActorSystemBooter.BindLifeCycle
 
 case class WorkerInfo(workerId: WorkerId, ref: ActorRef)
 
 /**
- * This contains JVM configurations to start an executor system
+ * Configurations to start an executor system on remote machine
+ *
+ * @param address Remote address where we start an Actor System.
  */
 case class ExecutorSystem(executorSystemId: Int, address: Address, daemon:
-ActorRef, resource: Resource, worker: WorkerInfo) {
+    ActorRef, resource: Resource, worker: WorkerInfo) {
   def bindLifeCycleWith(actor: ActorRef): Unit = {
     daemon ! BindLifeCycle(actor)
   }
 
-  def shutdown: Unit = {
+  def shutdown(): Unit = {
     daemon ! PoisonPill
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala
index 80af748..78432f4 100644
--- a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala
+++ b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala
@@ -18,41 +18,42 @@
 
 package io.gearpump.cluster.appmaster
 
+import scala.concurrent.duration._
+
 import akka.actor._
+import org.slf4j.Logger
+
 import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor
 import io.gearpump.cluster.ExecutorJVMConfig
-import io.gearpump.cluster.scheduler.Resource
-import io.gearpump.util.{Constants, ActorSystemBooter, ActorUtil, LogUtil}
 import io.gearpump.cluster.WorkerToAppMaster._
 import io.gearpump.cluster.appmaster.ExecutorSystemLauncher._
 import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, Session}
-import io.gearpump.util.ActorSystemBooter.{ActorSystemRegistered, BindLifeCycle, RegisterActorSystem}
-import org.slf4j.Logger
-
-import scala.concurrent.duration._
+import io.gearpump.cluster.scheduler.Resource
+import io.gearpump.util.ActorSystemBooter.{ActorSystemRegistered, RegisterActorSystem}
+import io.gearpump.util.{ActorSystemBooter, ActorUtil, Constants, LogUtil}
 
 /**
  * This launches single executor system on target worker.
  *
  * Please use ExecutorSystemLauncher.props() to construct this actor
  *
- * @param appId
  * @param session The session that request to launch executor system
  */
 private[appmaster]
-class ExecutorSystemLauncher (appId: Int, session: Session) extends Actor {
+class ExecutorSystemLauncher(appId: Int, session: Session) extends Actor {
 
   private val LOG: Logger = LogUtil.getLogger(getClass)
 
   val scheduler = context.system.scheduler
   implicit val executionContext = context.dispatcher
 
-  val timeoutSetting = context.system.settings.config.getInt(Constants.GEARPUMP_START_EXECUTOR_SYSTEM_TIMEOUT_MS)
+  private val systemConfig = context.system.settings.config
+  val timeoutSetting = systemConfig.getInt(Constants.GEARPUMP_START_EXECUTOR_SYSTEM_TIMEOUT_MS)
 
-  val timeout = scheduler.scheduleOnce(timeoutSetting milliseconds,
+  val timeout = scheduler.scheduleOnce(timeoutSetting.milliseconds,
     self, LaunchExecutorSystemTimeout(session))
 
-  def receive : Receive = waitForLaunchCommand
+  def receive: Receive = waitForLaunchCommand
 
   def waitForLaunchCommand: Receive = {
     case LaunchExecutorSystem(worker, executorSystemId, resource) =>
@@ -61,23 +62,27 @@ class ExecutorSystemLauncher (appId: Int, session: Session) extends Actor {
         .map(getExecutorJvmConfig(_, s"app${appId}system${executorSystemId}", launcherPath)).orNull
 
       val launch = LaunchExecutor(appId, executorSystemId, resource, jvmConfig)
-      LOG.info(s"Launching Executor ...appId: $appId, executorSystemId: $executorSystemId, slots: ${resource.slots} on worker $worker")
+      LOG.info(s"Launching Executor ...appId: $appId, executorSystemId: $executorSystemId, " +
+        s"slots: ${resource.slots} on worker $worker")
 
       worker.ref ! launch
       context.become(waitForActorSystemToStart(sender, launch, worker, executorSystemId))
   }
 
-  def waitForActorSystemToStart(replyTo: ActorRef, launch: LaunchExecutor, worker: WorkerInfo, executorSystemId: Int) : Receive = {
+  def waitForActorSystemToStart(
+      replyTo: ActorRef, launch: LaunchExecutor, worker: WorkerInfo, executorSystemId: Int)
+    : Receive = {
     case RegisterActorSystem(systemPath) =>
       import launch._
       timeout.cancel()
       LOG.info(s"Received RegisterActorSystem $systemPath for session ${session.requestor}")
       sender ! ActorSystemRegistered(worker.ref)
-      val system = ExecutorSystem(executorId, AddressFromURIString(systemPath), sender, resource, worker)
+      val system =
+        ExecutorSystem(executorId, AddressFromURIString(systemPath), sender, resource, worker)
       replyTo ! LaunchExecutorSystemSuccess(system, session)
       context.stop(self)
-    case reject @ ExecutorLaunchRejected(reason, ex) =>
-      LOG.error(s"Executor Launch ${launch.resource} failed reason:$reason", ex)
+    case reject@ExecutorLaunchRejected(reason, ex) =>
+      LOG.error(s"Executor Launch ${launch.resource} failed reason: $reason", ex)
       replyTo ! LaunchExecutorSystemRejected(launch.resource, reason, session)
       context.stop(self)
     case timeout: LaunchExecutorSystemTimeout =>
@@ -89,6 +94,7 @@ class ExecutorSystemLauncher (appId: Int, session: Session) extends Actor {
 
 private[appmaster]
 object ExecutorSystemLauncher {
+
   case class LaunchExecutorSystem(worker: WorkerInfo, systemId: Int, resource: Resource)
 
   case class LaunchExecutorSystemSuccess(system: ExecutorSystem, session: Session)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala
index aa980b5..c5ec600 100644
--- a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala
+++ b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala
@@ -18,29 +18,29 @@
 
 package io.gearpump.cluster.appmaster
 
+import scala.concurrent.duration._
+
 import akka.actor._
 import com.typesafe.config.Config
-import io.gearpump.WorkerId
+
 import io.gearpump.cluster.AppMasterToMaster.RequestResource
 import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated
 import io.gearpump.cluster._
 import io.gearpump.cluster.appmaster.ExecutorSystemLauncher._
 import io.gearpump.cluster.appmaster.ExecutorSystemScheduler._
 import io.gearpump.cluster.scheduler.{ResourceAllocation, ResourceRequest}
+import io.gearpump.cluster.worker.WorkerId
 import io.gearpump.util.{Constants, LogUtil}
 
-import scala.concurrent.duration._
-
 /**
  * ExecutorSystem is also a type of resource, this class schedules ExecutorSystem for AppMaster.
  * AppMaster can use this class to directly request a live executor actor systems. The communication
  * in the background with Master and Worker is hidden from AppMaster.
  *
  * Please use ExecutorSystemScheduler.props() to construct this actor
- *
-*/
+ */
 private[appmaster]
-class ExecutorSystemScheduler (appId: Int, masterProxy: ActorRef,
+class ExecutorSystemScheduler(appId: Int, masterProxy: ActorRef,
     executorSystemLauncher: (Int, Session) => Props) extends Actor {
 
   private val LOG = LogUtil.getLogger(getClass, app = appId)
@@ -50,18 +50,22 @@ class ExecutorSystemScheduler (appId: Int, masterProxy: ActorRef,
 
   var resourceAgents = Map.empty[Session, ActorRef]
 
-  def receive: Receive = clientCommands orElse resourceAllocationMessageHandler orElse executorSystemMessageHandler
+  def receive: Receive = {
+    clientCommands orElse resourceAllocationMessageHandler orElse executorSystemMessageHandler
+  }
 
   def clientCommands: Receive = {
     case start: StartExecutorSystems =>
-      LOG.info(s"starting executor systems (ExecutorSystemConfig(${start.executorSystemConfig}), Resources(${start.resources.mkString(",")}))")
+      LOG.info(s"starting executor systems (ExecutorSystemConfig(${start.executorSystemConfig}), " +
+        s"Resources(${start.resources.mkString(",")}))")
       val requestor = sender()
       val executorSystemConfig = start.executorSystemConfig
-      val session  = Session(requestor, executorSystemConfig)
-      val agent = resourceAgents.getOrElse(session, context.actorOf(Props(new ResourceAgent(masterProxy, session))))
+      val session = Session(requestor, executorSystemConfig)
+      val agent = resourceAgents.getOrElse(session,
+        context.actorOf(Props(new ResourceAgent(masterProxy, session))))
       resourceAgents = resourceAgents + (session -> agent)
 
-      start.resources.foreach {resource =>
+      start.resources.foreach { resource =>
         agent ! RequestResource(appId, resource)
       }
 
@@ -87,14 +91,15 @@ class ExecutorSystemScheduler (appId: Int, masterProxy: ActorRef,
       }
   }
 
-  def executorSystemMessageHandler : Receive = {
+  def executorSystemMessageHandler: Receive = {
     case LaunchExecutorSystemSuccess(system, session) =>
       if (isSessionAlive(session)) {
         LOG.info("LaunchExecutorSystemSuccess, send back to " + session.requestor)
         system.bindLifeCycleWith(self)
         session.requestor ! ExecutorSystemStarted(system, session.executorSystemJvmConfig.jar)
       } else {
-        LOG.error("We get a ExecutorSystem back, but resource requestor is no longer valid. Will shutdown the allocated system")
+        LOG.error("We get a ExecutorSystem back, but resource requestor is no longer valid. " +
+          "Will shutdown the allocated system")
         system.shutdown
       }
     case LaunchExecutorSystemTimeout(session) =>
@@ -105,8 +110,11 @@ class ExecutorSystemScheduler (appId: Int, masterProxy: ActorRef,
 
     case LaunchExecutorSystemRejected(resource, reason, session) =>
       if (isSessionAlive(session)) {
-        LOG.error(s"Failed to launch executor system, due to $reason, will ask master to allocate new resources $resource")
-        resourceAgents.get(session).map(_ ! RequestResource(appId, ResourceRequest(resource, WorkerId.unspecified)))
+        LOG.error(s"Failed to launch executor system, due to $reason, " +
+          s"will ask master to allocate new resources $resource")
+        resourceAgents.get(session).map { resourceAgent: ActorRef =>
+          resourceAgent ! RequestResource(appId, ResourceRequest(resource, WorkerId.unspecified))
+        }
       }
   }
 
@@ -117,27 +125,28 @@ class ExecutorSystemScheduler (appId: Int, masterProxy: ActorRef,
 
 object ExecutorSystemScheduler {
 
-  case class StartExecutorSystems(resources: Array[ResourceRequest], executorSystemConfig: ExecutorSystemJvmConfig)
+  case class StartExecutorSystems(
+      resources: Array[ResourceRequest], executorSystemConfig: ExecutorSystemJvmConfig)
+
   case class ExecutorSystemStarted(system: ExecutorSystem, boundedJar: Option[AppJar])
+
   case class StopExecutorSystem(system: ExecutorSystem)
-  case object StartExecutorSystemTimeout
 
-  case class ExecutorSystemJvmConfig(classPath : Array[String], jvmArguments : Array[String],
-     jar: Option[AppJar], username : String, executorAkkaConfig: Config = null)
+  case object StartExecutorSystemTimeout
 
+  case class ExecutorSystemJvmConfig(classPath: Array[String], jvmArguments: Array[String],
+      jar: Option[AppJar], username: String, executorAkkaConfig: Config = null)
 
   /**
    * For each client which ask for an executor system, the scheduler will create a session for it.
    *
-   * @param requestor
-   * @param executorSystemJvmConfig
    */
-  private [appmaster]
+  private[appmaster]
   case class Session(requestor: ActorRef, executorSystemJvmConfig: ExecutorSystemJvmConfig)
 
   /**
    * This is a agent for session to request resource
-   * @param master
+   *
    * @param session the original requester of the resource requests
    */
   private[appmaster]
@@ -145,17 +154,19 @@ object ExecutorSystemScheduler {
     private var resourceRequestor: ActorRef = null
     var timeOutClock: Cancellable = null
     private var unallocatedResource: Int = 0
+
     import context.dispatcher
 
-    import Constants._
-    val timeout = context.system.settings.config.getInt(GEARPUMP_RESOURCE_ALLOCATION_TIMEOUT)
+    import io.gearpump.util.Constants._
 
+    val timeout = context.system.settings.config.getInt(GEARPUMP_RESOURCE_ALLOCATION_TIMEOUT)
 
     def receive: Receive = {
       case request: RequestResource =>
         unallocatedResource += request.request.resource.slots
         Option(timeOutClock).map(_.cancel)
-        timeOutClock = context.system.scheduler.scheduleOnce(timeout seconds, self, ResourceAllocationTimeOut(session))
+        timeOutClock = context.system.scheduler.scheduleOnce(
+          timeout.seconds, self, ResourceAllocationTimeOut(session))
         resourceRequestor = sender
         master ! request
       case ResourceAllocated(allocations) =>
@@ -164,7 +175,7 @@ object ExecutorSystemScheduler {
       case timeout: ResourceAllocationTimeOut =>
         if (unallocatedResource > 0) {
           resourceRequestor ! ResourceAllocationTimeOut(session)
-          //we will not receive any ResourceAllocation after timeout
+          // We will not receive any ResourceAllocation after timeout
           context.stop(self)
         }
     }
@@ -175,4 +186,5 @@ object ExecutorSystemScheduler {
 
   private[ExecutorSystemScheduler]
   case class ResourceAllocationTimeOut(session: Session)
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeper.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeper.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeper.scala
index 300c4ea..f8c8503 100644
--- a/core/src/main/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeper.scala
+++ b/core/src/main/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeper.scala
@@ -19,31 +19,27 @@
 package io.gearpump.cluster.appmaster
 
 import java.util.concurrent.TimeUnit
+import scala.concurrent.duration.FiniteDuration
 
 import akka.actor._
-import io.gearpump.cluster.master.MasterProxy.MasterRestarted
+
 import io.gearpump.cluster.AppMasterToMaster.RegisterAppMaster
 import io.gearpump.cluster.MasterToAppMaster.AppMasterRegistered
 import io.gearpump.cluster.appmaster.MasterConnectionKeeper.AppMasterRegisterTimeout
 import io.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus.{MasterConnected, MasterStopped}
-import io.gearpump.cluster.master.MasterProxy.WatchMaster
+import io.gearpump.cluster.master.MasterProxy.{MasterRestarted, WatchMaster}
 import io.gearpump.util.LogUtil
 
-import scala.concurrent.duration.FiniteDuration
-
 /**
- * This will watch the liveness of Master.
- * When Master is restarted, it will send RegisterAppMaster to the new Master instance.
- * If Master is stopped, it will send the MasterConnectionStatus to listener
+ * Watches the liveness of Master.
  *
- * please use MasterConnectionKeeper.props() to construct this actor
+ * When Master is restarted, it sends RegisterAppMaster to the new Master instance.
+ * If Master is stopped, it sends the MasterConnectionStatus to listener
  *
- * @param register
- * @param masterProxy
- * @param masterStatusListener
+ * please use MasterConnectionKeeper.props() to construct this actor
  */
 private[appmaster]
-class MasterConnectionKeeper (
+class MasterConnectionKeeper(
     register: RegisterAppMaster, masterProxy: ActorRef, masterStatusListener: ActorRef)
   extends Actor {
 
@@ -52,12 +48,13 @@ class MasterConnectionKeeper (
   private val LOG = LogUtil.getLogger(getClass)
   private var master: ActorRef = null
 
-  //Subscribe self to masterProxy,
+  // Subscribe self to masterProxy,
   masterProxy ! WatchMaster(self)
 
   def registerAppMaster: Cancellable = {
     masterProxy ! register
-    context.system.scheduler.scheduleOnce(FiniteDuration(30, TimeUnit.SECONDS), self, AppMasterRegisterTimeout)
+    context.system.scheduler.scheduleOnce(FiniteDuration(30, TimeUnit.SECONDS),
+      self, AppMasterRegisterTimeout)
   }
 
   context.become(waitMasterToConfirm(registerAppMaster))
@@ -87,10 +84,15 @@ class MasterConnectionKeeper (
 }
 
 private[appmaster] object MasterConnectionKeeper {
+
   case object AppMasterRegisterTimeout
 
   object MasterConnectionStatus {
+
     case object MasterConnected
+
     case object MasterStopped
+
   }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala b/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala
index 3a2868c..41c01d8 100644
--- a/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala
+++ b/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,10 +19,16 @@
 package io.gearpump.cluster.client
 
 import java.util.concurrent.TimeUnit
+import scala.collection.JavaConverters._
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
+import scala.util.Try
 
 import akka.actor.{ActorRef, ActorSystem}
 import akka.util.Timeout
-import com.typesafe.config.{ConfigValueFactory, Config}
+import com.typesafe.config.{Config, ConfigValueFactory}
+import org.slf4j.Logger
+
 import io.gearpump.cluster.MasterToAppMaster.{AppMastersData, ReplayFromTimestampWindowTrailingEdge}
 import io.gearpump.cluster.MasterToClient.ReplayApplicationResult
 import io.gearpump.cluster._
@@ -30,17 +36,12 @@ import io.gearpump.cluster.master.MasterProxy
 import io.gearpump.jarstore.JarStoreService
 import io.gearpump.util.Constants._
 import io.gearpump.util.{ActorUtil, Constants, LogUtil, Util}
-import org.slf4j.Logger
-
-import scala.collection.JavaConversions._
-import scala.concurrent.{Await, Future}
-import scala.concurrent.duration.Duration
-import scala.util.Try
 
 /**
  * ClientContext is a user facing util to submit/manage an application.
+ *
+ * TODO: add interface to query master here
  */
-//TODO: add interface to query master here
 class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) {
 
   def this(system: ActorSystem) = {
@@ -54,7 +55,7 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) {
   private val LOG: Logger = LogUtil.getLogger(getClass)
   private implicit val timeout = Timeout(5, TimeUnit.SECONDS)
 
-  implicit val system = Option(sys).getOrElse(ActorSystem(s"client${Util.randInt}" , config))
+  implicit val system = Option(sys).getOrElse(ActorSystem(s"client${Util.randInt()}", config))
   LOG.info(s"Starting system ${system.name}")
   val shouldCleanupSystem = Option(sys).isEmpty
 
@@ -62,16 +63,18 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) {
   jarStoreService.init(config, system)
 
   private lazy val master: ActorRef = {
-    val masters = config.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).toList.flatMap(Util.parseHostList)
-    val master = Option(_master).getOrElse(system.actorOf(MasterProxy.props(masters), s"masterproxy${system.name}"))
+    val masters = config.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala
+      .flatMap(Util.parseHostList)
+    val master = Option(_master).getOrElse(system.actorOf(MasterProxy.props(masters),
+      s"masterproxy${system.name}"))
     LOG.info(s"Creating master proxy ${master} for master list: $masters")
     master
   }
 
   /**
-   * Submit an application with default jar setting. Use java property
-   * "gearpump.app.jar" if defined. Otherwise, will assume the jar is on
-   * the target runtime classpath, and will not send it.
+   * Submits an application with default jar setting. Use java property "gearpump.app.jar" if
+   * defined. Otherwise, it assumes the jar is on the target runtime classpath, thus will
+   * not send the jar across the wire.
    */
   def submit(app: Application): Int = {
     submit(app, System.getProperty(GEARPUMP_APP_JAR))
@@ -86,7 +89,8 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) {
     val appName = checkAndAddNamePrefix(app.name, System.getProperty(GEARPUMP_APP_NAME_PREFIX))
     val submissionConfig = getSubmissionConfig(config)
       .withValue(APPLICATION_EXECUTOR_NUMBER, ConfigValueFactory.fromAnyRef(executorNum))
-    val appDescription = AppDescription(appName, app.appMaster.getName, app.userConfig, submissionConfig)
+    val appDescription =
+      AppDescription(appName, app.appMaster.getName, app.userConfig, submissionConfig)
     val appJar = Option(jar).map(loadFile)
     client.submitApplication(appDescription, appJar)
   }
@@ -99,9 +103,11 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) {
     ClusterConfig.filterOutDefaultConfig(config)
   }
 
-  def replayFromTimestampWindowTrailingEdge(appId : Int): ReplayApplicationResult = {
+  def replayFromTimestampWindowTrailingEdge(appId: Int): ReplayApplicationResult = {
     import scala.concurrent.ExecutionContext.Implicits.global
-    val result = Await.result(ActorUtil.askAppMaster[ReplayApplicationResult](master, appId,ReplayFromTimestampWindowTrailingEdge(appId)), Duration.Inf)
+    val result = Await.result(
+      ActorUtil.askAppMaster[ReplayApplicationResult](master,
+        appId, ReplayFromTimestampWindowTrailingEdge(appId)), Duration.Inf)
     result
   }
 
@@ -115,30 +121,30 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) {
     client.listApplications
   }
 
-  def shutdown(appId : Int) : Unit = {
+  def shutdown(appId: Int): Unit = {
     val client = getMasterClient
     client.shutdownApplication(appId)
   }
 
-  def resolveAppID(appId: Int) : ActorRef = {
+  def resolveAppID(appId: Int): ActorRef = {
     val client = getMasterClient
     client.resolveAppId(appId)
   }
 
-  def close() : Unit = {
+  def close(): Unit = {
     if (shouldCleanupSystem) {
       LOG.info(s"Shutting down system ${system.name}")
-      system.shutdown()
+      system.terminate()
     }
   }
 
-  private def loadFile(jarPath : String) : AppJar = {
+  private def loadFile(jarPath: String): AppJar = {
     val jarFile = new java.io.File(jarPath)
     val path = jarStoreService.copyFromLocal(jarFile)
     AppJar(jarFile.getName, path)
   }
 
-  private def checkAndAddNamePrefix(appName: String, namePrefix: String) : String = {
+  private def checkAndAddNamePrefix(appName: String, namePrefix: String): String = {
     val fullName = if (namePrefix != null && namePrefix != "") {
       namePrefix + "_" + appName
     } else {
@@ -163,12 +169,17 @@ object ClientContext {
 
   def apply(): ClientContext = new ClientContext(ClusterConfig.default(), null, null)
 
-  def apply(system: ActorSystem) = new ClientContext(ClusterConfig.default(), system, null)
+  def apply(system: ActorSystem): ClientContext = {
+    new ClientContext(ClusterConfig.default(), system, null)
+  }
 
-  def apply(system: ActorSystem, master: ActorRef) = new ClientContext(ClusterConfig.default(), system, master)
+  def apply(system: ActorSystem, master: ActorRef): ClientContext = {
+    new ClientContext(ClusterConfig.default(), system, master)
+  }
 
   def apply(config: Config): ClientContext = new ClientContext(config, null, null)
 
-  def apply(config: Config, system: ActorSystem, master: ActorRef): ClientContext
-    = new ClientContext(config, system, master)
+  def apply(config: Config, system: ActorSystem, master: ActorRef): ClientContext = {
+    new ClientContext(config, system, master)
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/client/MasterClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/client/MasterClient.scala b/core/src/main/scala/io/gearpump/cluster/client/MasterClient.scala
index 5800e8d..9edaf46 100644
--- a/core/src/main/scala/io/gearpump/cluster/client/MasterClient.scala
+++ b/core/src/main/scala/io/gearpump/cluster/client/MasterClient.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,31 +18,36 @@
 
 package io.gearpump.cluster.client
 
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
+import scala.util.{Failure, Success}
+
 import akka.actor.ActorRef
 import akka.pattern.ask
 import akka.util.Timeout
-import io.gearpump.cluster.ClientToMaster._
-import io.gearpump.cluster.MasterToAppMaster.{AppMastersData, AppMastersDataRequest, ReplayFromTimestampWindowTrailingEdge}
-import io.gearpump.cluster.MasterToClient.{ReplayApplicationResult, ResolveAppIdResult, ShutdownApplicationResult, SubmitApplicationResult}
-import io.gearpump.cluster.{AppJar, AppDescription}
-import io.gearpump.util.{ActorUtil, Constants}
 
-import scala.concurrent.duration.Duration
-import scala.concurrent.{ExecutionContext, Await, Future}
-import scala.util.{Failure, Success}
+import io.gearpump.cluster.ClientToMaster._
+import io.gearpump.cluster.MasterToAppMaster.{AppMastersData, AppMastersDataRequest}
+import io.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult, SubmitApplicationResult}
+import io.gearpump.cluster.{AppDescription, AppJar}
 
 /**
- * Client to Master node.
- * Stateless, thread safe
+ * Client to inter-operate with Master node.
+ *
+ * NOTE: Stateless, thread safe
  */
-class MasterClient(master : ActorRef, timeout: Timeout) {
+class MasterClient(master: ActorRef, timeout: Timeout) {
   implicit val masterClientTimeout = timeout
 
-  def submitApplication(app : AppDescription, appJar: Option[AppJar]) : Int = {
-    val result = Await.result( (master ? SubmitApplication(app, appJar)).asInstanceOf[Future[SubmitApplicationResult]], Duration.Inf)
+  def submitApplication(app: AppDescription, appJar: Option[AppJar]): Int = {
+    val result = Await.result(
+      (master ? SubmitApplication(app, appJar)).asInstanceOf[Future[SubmitApplicationResult]],
+      Duration.Inf)
     val appId = result.appId match {
       case Success(appId) =>
+        // scalastyle:off println
         Console.println(s"Submit application succeed. The application id is $appId")
+        // scalastyle:on println
         appId
       case Failure(ex) => throw ex
     }
@@ -50,15 +55,18 @@ class MasterClient(master : ActorRef, timeout: Timeout) {
   }
 
   def resolveAppId(appId: Int): ActorRef = {
-    val result = Await.result((master ? ResolveAppId(appId)).asInstanceOf[Future[ResolveAppIdResult]], Duration.Inf)
+    val result = Await.result(
+      (master ? ResolveAppId(appId)).asInstanceOf[Future[ResolveAppIdResult]], Duration.Inf)
     result.appMaster match {
       case Success(appMaster) => appMaster
       case Failure(ex) => throw ex
     }
   }
 
-  def shutdownApplication(appId : Int) : Unit = {
-    val result = Await.result((master ? ShutdownApplication(appId)).asInstanceOf[Future[ShutdownApplicationResult]], Duration.Inf)
+  def shutdownApplication(appId: Int): Unit = {
+    val result = Await.result(
+      (master ? ShutdownApplication(appId)).asInstanceOf[Future[ShutdownApplicationResult]],
+      Duration.Inf)
     result.appId match {
       case Success(_) =>
       case Failure(ex) => throw ex
@@ -66,7 +74,8 @@ class MasterClient(master : ActorRef, timeout: Timeout) {
   }
 
   def listApplications: AppMastersData = {
-    val result = Await.result((master ? AppMastersDataRequest).asInstanceOf[Future[AppMastersData]], Duration.Inf)
+    val result = Await.result(
+      (master ? AppMastersDataRequest).asInstanceOf[Future[AppMastersData]], Duration.Inf)
     result
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/main/ArgumentsParser.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/main/ArgumentsParser.scala b/core/src/main/scala/io/gearpump/cluster/main/ArgumentsParser.scala
index f290241..209f831 100644
--- a/core/src/main/scala/io/gearpump/cluster/main/ArgumentsParser.scala
+++ b/core/src/main/scala/io/gearpump/cluster/main/ArgumentsParser.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,39 +20,44 @@ package io.gearpump.cluster.main
 
 import io.gearpump.cluster.main.ArgumentsParser.Syntax
 
-case class CLIOption[+T] (description:String = "", required: Boolean = false, defaultValue: Option[T] = None)
+case class CLIOption[+T](
+    description: String = "", required: Boolean = false, defaultValue: Option[T] = None)
 
-class ParseResult(optionMap : Map[String, String], remainArguments : Array[String]) {
-  def getInt(key : String) = optionMap.get(key).get.toInt
+class ParseResult(optionMap: Map[String, String], remainArguments: Array[String]) {
+  def getInt(key: String): Int = optionMap.get(key).get.toInt
 
-  def getString (key : String) = optionMap.get(key).get
+  def getString(key: String): String = optionMap.get(key).get
 
-  def getBoolean (key : String) = optionMap.get(key).get.toBoolean
+  def getBoolean(key: String): Boolean = optionMap.get(key).get.toBoolean
 
-  def exists(key : String) = !optionMap.getOrElse(key,"").isEmpty
+  def exists(key: String): Boolean = !(optionMap.getOrElse(key, "").isEmpty)
 
-  def remainArgs : Array[String] = this.remainArguments
+  def remainArgs: Array[String] = this.remainArguments
 }
 
 /**
- * Parse command line arguments
+ * Parser for command line arguments
+ *
  * Grammar: -option1 value1 -option2 value3 -flag1 -flag2 remainArg1 remainArg2...
  */
-trait  ArgumentsParser {
+trait ArgumentsParser {
 
   val ignoreUnknownArgument = false
 
-  def help: Unit = {
-    Console.err.println(s"\nHelp: $description")
+  // scalastyle:off println
+  def help(): Unit = {
+    Console.println(s"\nHelp: $description")
     var usage = List.empty[String]
-    options.map(kv => if(kv._2.required) {
+    options.map(kv => if (kv._2.required) {
       usage = usage :+ s"-${kv._1} (required:${kv._2.required})${kv._2.description}"
     } else {
-      usage = usage :+ s"-${kv._1} (required:${kv._2.required}, default:${kv._2.defaultValue.getOrElse("")})${kv._2.description}"
+      usage = usage :+ s"-${kv._1} (required:${kv._2.required}, " +
+        s"default:${kv._2.defaultValue.getOrElse("")})${kv._2.description}"
     })
     usage :+= remainArgs.map(k => s"<$k>").mkString(" ")
-    usage.foreach(Console.err.println(_))
+    usage.foreach(Console.println(_))
   }
+  // scalastyle:on println
 
   def parse(args: Array[String]): ParseResult = {
     val syntax = Syntax(options, remainArgs, ignoreUnknownArgument)
@@ -60,16 +65,18 @@ trait  ArgumentsParser {
   }
 
   val description: String = ""
-  val options : Array[(String, CLIOption[Any])] = Array.empty[(String, CLIOption[Any])]
-  val remainArgs : Array[String] = Array.empty[String]
+  val options: Array[(String, CLIOption[Any])] = Array.empty[(String, CLIOption[Any])]
+  val remainArgs: Array[String] = Array.empty[String]
 }
 
 object ArgumentsParser {
 
-  case class Syntax(val options: Array[(String, CLIOption[Any])], val remainArgs : Array[String], val ignoreUnknownArgument: Boolean)
+  case class Syntax(
+      val options: Array[(String, CLIOption[Any])], val remainArgs: Array[String],
+      val ignoreUnknownArgument: Boolean)
 
   def parse(syntax: Syntax, args: Array[String]): ParseResult = {
-    import syntax.{options, remainArgs, ignoreUnknownArgument}
+    import syntax.{ignoreUnknownArgument, options, remainArgs}
     var config = Map.empty[String, String]
     var remain = Array.empty[String]
 
@@ -100,14 +107,16 @@ object ArgumentsParser {
           doParse(rest)
 
         case value :: rest =>
+          // scalastyle:off println
           Console.err.println(s"Warning: get unknown argument $value, maybe it is a main class")
+          // scalastyle:on println
           remain ++= value :: rest
           doParse(Nil)
       }
     }
     doParse(args.toList)
 
-    options.foreach{pair =>
+    options.foreach { pair =>
       val (key, option) = pair
       if (!config.contains(key) && !option.required) {
         config += key -> option.defaultValue.getOrElse("").toString

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala b/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala
index d391bba..fb3e5c4 100644
--- a/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala
+++ b/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala
@@ -19,36 +19,36 @@
 package io.gearpump.cluster.master
 
 import java.util.concurrent.{TimeUnit, TimeoutException}
+import scala.collection.JavaConverters._
+import scala.concurrent.duration.Duration
+import scala.util.{Failure, Success}
 
 import akka.actor.{Actor, ActorRef, Props, _}
 import com.typesafe.config.Config
-import io.gearpump.cluster.{AppJar, AppDescription}
+import org.slf4j.Logger
+
 import io.gearpump.cluster.AppMasterToMaster.RequestResource
 import io.gearpump.cluster.AppMasterToWorker.{LaunchExecutor, ShutdownExecutor}
 import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated
 import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
 import io.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected
-import io.gearpump.cluster._
-import io.gearpump.cluster.appmaster.{WorkerInfo, AppMasterRuntimeEnvironment, AppMasterRuntimeInfo}
+import io.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, AppMasterRuntimeInfo, WorkerInfo}
 import io.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest}
+import io.gearpump.cluster.worker.WorkerId
+import io.gearpump.cluster.{AppDescription, AppJar, _}
 import io.gearpump.transport.HostPort
 import io.gearpump.util.ActorSystemBooter._
 import io.gearpump.util.Constants._
 import io.gearpump.util.{ActorSystemBooter, ActorUtil, LogUtil, Util}
-import org.slf4j.Logger
 
-import scala.collection.JavaConverters._
-import scala.concurrent.duration.Duration
-import scala.util.{Failure, Success}
-import io.gearpump.WorkerId
 /**
  *
  * AppMasterLauncher is a child Actor of AppManager, it is responsible
  * to launch the AppMaster on the cluster.
  */
 class AppMasterLauncher(
-    appId : Int, executorId: Int, app : AppDescription,
-    jar: Option[AppJar], username : String, master : ActorRef, client: Option[ActorRef])
+    appId: Int, executorId: Int, app: AppDescription,
+    jar: Option[AppJar], username: String, master: ActorRef, client: Option[ActorRef])
   extends Actor {
   private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
 
@@ -61,9 +61,9 @@ class AppMasterLauncher(
   LOG.info(s"Ask Master resource to start AppMaster $appId...")
   master ! RequestResource(appId, ResourceRequest(Resource(1), WorkerId.unspecified))
 
-  def receive : Receive = waitForResourceAllocation
+  def receive: Receive = waitForResourceAllocation
 
-  def waitForResourceAllocation : Receive = {
+  def waitForResourceAllocation: Receive = {
     case ResourceAllocated(allocations) =>
 
       val ResourceAllocation(resource, worker, workerId) = allocations(0)
@@ -74,22 +74,27 @@ class AppMasterLauncher(
       val appMasterInfo = AppMasterRuntimeInfo(appId, app.name, worker, username,
         submissionTime, config = appMasterAkkaConfig)
       val workerInfo = WorkerInfo(workerId, worker)
-      val appMasterContext = AppMasterContext(appId, username, resource, workerInfo, jar, null, appMasterInfo)
+      val appMasterContext =
+        AppMasterContext(appId, username, resource, workerInfo, jar, null, appMasterInfo)
       LOG.info(s"Try to launch a executor for AppMaster on worker ${workerId} for app $appId")
       val name = ActorUtil.actorNameForExecutor(appId, executorId)
       val selfPath = ActorUtil.getFullPath(context.system, self.path)
 
-      val jvmSetting = Util.resolveJvmSetting(appMasterAkkaConfig.withFallback(systemConfig)).appMater
-      val executorJVM = ExecutorJVMConfig(jvmSetting.classPath ,jvmSetting.vmargs,
-        classOf[ActorSystemBooter].getName, Array(name, selfPath), jar, username, appMasterAkkaConfig)
+      val jvmSetting =
+        Util.resolveJvmSetting(appMasterAkkaConfig.withFallback(systemConfig)).appMater
+      val executorJVM = ExecutorJVMConfig(jvmSetting.classPath, jvmSetting.vmargs,
+        classOf[ActorSystemBooter].getName, Array(name, selfPath), jar,
+        username, appMasterAkkaConfig)
 
       worker ! LaunchExecutor(appId, executorId, resource, executorJVM)
       context.become(waitForActorSystemToStart(worker, appMasterContext, app.userConfig, resource))
   }
 
-  def waitForActorSystemToStart(worker : ActorRef, appContext : AppMasterContext, user : UserConfig, resource: Resource) : Receive = {
+  def waitForActorSystemToStart(
+      worker: ActorRef, appContext: AppMasterContext, user: UserConfig, resource: Resource)
+    : Receive = {
     case ExecutorLaunchRejected(reason, ex) =>
-      LOG.error(s"Executor Launch failed reason:$reason", ex)
+      LOG.error(s"Executor Launch failed reason: $reason", ex)
       LOG.info(s"reallocate resource $resource to start appmaster")
       master ! RequestResource(appId, ResourceRequest(resource, WorkerId.unspecified))
       context.become(waitForResourceAllocation)
@@ -99,7 +104,8 @@ class AppMasterLauncher(
 
       val masterAddress = systemConfig.getStringList(GEARPUMP_CLUSTER_MASTERS)
         .asScala.map(HostPort(_)).map(ActorUtil.getMasterActorPath)
-      sender ! CreateActor(AppMasterRuntimeEnvironment.props(masterAddress, app, appContext), s"appdaemon$appId")
+      sender ! CreateActor(
+        AppMasterRuntimeEnvironment.props(masterAddress, app, appContext), s"appdaemon$appId")
 
       import context.dispatcher
       val appMasterTimeout = scheduler.scheduleOnce(TIMEOUT, self,
@@ -107,7 +113,7 @@ class AppMasterLauncher(
       context.become(waitForAppMasterToStart(worker, appMasterTimeout))
   }
 
-  def waitForAppMasterToStart(worker : ActorRef, cancel: Cancellable) : Receive = {
+  def waitForAppMasterToStart(worker: ActorRef, cancel: Cancellable): Receive = {
     case ActorCreated(appMaster, _) =>
       cancel.cancel()
       sender ! BindLifeCycle(appMaster)
@@ -121,19 +127,21 @@ class AppMasterLauncher(
       context.stop(self)
   }
 
-  def replyToClient(result : SubmitApplicationResult) : Unit = {
+  def replyToClient(result: SubmitApplicationResult): Unit = {
     if (client.isDefined) {
       client.get.tell(result, master)
     }
   }
 }
 
-object AppMasterLauncher extends AppMasterLauncherFactory{
-  def props(appId : Int, executorId: Int, app : AppDescription, jar: Option[AppJar], username : String, master : ActorRef, client: Option[ActorRef]) = {
+object AppMasterLauncher extends AppMasterLauncherFactory {
+  def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar],
+      username: String, master: ActorRef, client: Option[ActorRef]): Props = {
     Props(new AppMasterLauncher(appId, executorId, app, jar, username, master, client))
   }
 }
 
 trait AppMasterLauncherFactory {
-  def props(appId : Int, executorId: Int, app : AppDescription, jar: Option[AppJar], username : String, master : ActorRef, client: Option[ActorRef]) : Props
+  def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar],
+      username: String, master: ActorRef, client: Option[ActorRef]): Props
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/master/MasterProxy.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/master/MasterProxy.scala b/core/src/main/scala/io/gearpump/cluster/master/MasterProxy.scala
index 5d9e410..61d95dc 100644
--- a/core/src/main/scala/io/gearpump/cluster/master/MasterProxy.scala
+++ b/core/src/main/scala/io/gearpump/cluster/master/MasterProxy.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,24 +18,21 @@
 
 package io.gearpump.cluster.master
 
+import scala.concurrent.duration.FiniteDuration
 
 import akka.actor._
-import io.gearpump.transport.HostPort
-import io.gearpump.util.{ActorUtil, LogUtil}
 import org.slf4j.Logger
 
-import scala.concurrent.duration.{FiniteDuration, Duration}
+import io.gearpump.transport.HostPort
+import io.gearpump.util.{ActorUtil, LogUtil}
 
 /**
  * This works with Master HA. When there are multiple Master nodes,
  * This will find a active one.
- *
- *
- * @param masters
  */
-class MasterProxy (masters: Iterable[ActorPath], timeout: FiniteDuration)
+class MasterProxy(masters: Iterable[ActorPath], timeout: FiniteDuration)
   extends Actor with Stash {
-  import MasterProxy._
+  import io.gearpump.cluster.master.MasterProxy._
 
   val LOG: Logger = LogUtil.getLogger(getClass, name = self.path.name)
 
@@ -48,10 +45,12 @@ class MasterProxy (masters: Iterable[ActorPath], timeout: FiniteDuration)
 
   import context.dispatcher
 
-  def findMaster() = repeatActionUtil(timeout){
-    contacts foreach { contact =>
-      LOG.info(s"sending identity to $contact")
-      contact ! Identify(None)
+  def findMaster(): Cancellable = {
+    repeatActionUtil(timeout) {
+      contacts foreach { contact =>
+        LOG.info(s"sending identity to $contact")
+        contact ! Identify(None)
+      }
     }
   }
 
@@ -64,11 +63,11 @@ class MasterProxy (masters: Iterable[ActorPath], timeout: FiniteDuration)
     super.postStop()
   }
 
-  override def receive : Receive = {
-    case _=>
+  override def receive: Receive = {
+    case _ =>
   }
 
-  def establishing(findMaster : Cancellable): Actor.Receive = {
+  def establishing(findMaster: Cancellable): Actor.Receive = {
     case ActorIdentity(_, Some(receptionist)) =>
       context watch receptionist
       LOG.info("Connected to [{}]", receptionist.path)
@@ -85,10 +84,10 @@ class MasterProxy (masters: Iterable[ActorPath], timeout: FiniteDuration)
   }
 
   def active(receptionist: ActorRef): Actor.Receive = {
-    case Terminated(receptionist) ⇒
+    case Terminated(receptionist) =>
       LOG.info("Lost contact with [{}], restablishing connection", receptionist)
       context.become(establishing(findMaster))
-    case _: ActorIdentity ⇒ // ok, from previous establish, already handled
+    case _: ActorIdentity => // ok, from previous establish, already handled
     case WatchMaster(watcher) =>
       watchers = watchers :+ watcher
   }
@@ -99,10 +98,10 @@ class MasterProxy (masters: Iterable[ActorPath], timeout: FiniteDuration)
       master forward msg
   }
 
-  def scheduler = context.system.scheduler
+  def scheduler: Scheduler = context.system.scheduler
   import scala.concurrent.duration._
-  private def repeatActionUtil(timeout: FiniteDuration)(action : => Unit) : Cancellable = {
-    val send = scheduler.schedule(0 seconds, 2 seconds)(action)
+  private def repeatActionUtil(timeout: FiniteDuration)(action: => Unit): Cancellable = {
+    val send = scheduler.schedule(0.seconds, 2.seconds)(action)
     val suicide = scheduler.scheduleOnce(timeout) {
       send.cancel()
       self ! PoisonPill
@@ -128,7 +127,7 @@ object MasterProxy {
   case class WatchMaster(watcher: ActorRef)
 
   import scala.concurrent.duration._
-  def props(masters: Iterable[HostPort], duration: FiniteDuration = 30 seconds): Props = {
+  def props(masters: Iterable[HostPort], duration: FiniteDuration = 30.seconds): Props = {
     val contacts = masters.map(ActorUtil.getMasterActorPath(_))
     Props(new MasterProxy(contacts, duration))
   }