You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/04/26 11:42:36 UTC

[26/49] incubator-gearpump git commit: GEARPUMP-11, fix code style

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/appmaster/AppMaster.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/AppMaster.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/AppMaster.scala
index a3f2fc5..df85017 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/AppMaster.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/AppMaster.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,17 +19,21 @@
 package io.gearpump.streaming.appmaster
 
 import java.lang.management.ManagementFactory
+import scala.concurrent.Future
 
 import akka.actor._
+import org.slf4j.Logger
+
 import io.gearpump._
 import io.gearpump.cluster.ClientToMaster.{GetLastFailure, GetStallingTasks, QueryHistoryMetrics, ShutdownApplication}
 import io.gearpump.cluster.MasterToAppMaster.{AppMasterDataDetailRequest, ReplayFromTimestampWindowTrailingEdge}
-import io.gearpump.cluster.MasterToClient.{HistoryMetricsItem, HistoryMetrics, LastFailure}
+import io.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, LastFailure}
 import io.gearpump.cluster._
+import io.gearpump.cluster.worker.WorkerId
 import io.gearpump.metrics.Metrics.ReportMetrics
 import io.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService}
 import io.gearpump.partitioner.PartitionerDescription
-import io.gearpump.streaming.ExecutorToAppMaster.{UnRegisterTask, MessageLoss, RegisterExecutor, RegisterTask}
+import io.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, RegisterExecutor, RegisterTask, UnRegisterTask}
 import io.gearpump.streaming._
 import io.gearpump.streaming.appmaster.AppMaster._
 import io.gearpump.streaming.appmaster.DagManager.{GetLatestDAG, LatestDAG, ReplaceProcessor}
@@ -42,30 +46,27 @@ import io.gearpump.streaming.util.ActorPathUtil
 import io.gearpump.util.Constants.{APPMASTER_DEFAULT_EXECUTOR_ID, _}
 import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
 import io.gearpump.util._
-import org.slf4j.Logger
-
-import scala.concurrent.Future
 
 /**
  * AppMaster is the head of a streaming application.
  *
  * It contains:
- * 1. ExecutorManager to manage all executors.
- * 2. TaskManager to manage all tasks,
- * 3. ClockService to track the global clock for this streaming application.
- * 4. Scheduler to decide which a task should be scheduled to.
- *
+ *  1. ExecutorManager to manage all executors.
+ *  2. TaskManager to manage all tasks,
+ *  3. ClockService to track the global clock for this streaming application.
+ *  4. Scheduler to decide which a task should be scheduled to.
  */
-class AppMaster(appContext : AppMasterContext, app : AppDescription)  extends ApplicationMaster {
+class AppMaster(appContext: AppMasterContext, app: AppDescription) extends ApplicationMaster {
   import app.userConfig
   import appContext.{appId, masterProxy, username}
 
-  implicit val actorSystem = context.system
-  implicit val timeOut = FUTURE_TIMEOUT
+  private implicit val actorSystem = context.system
+  private implicit val timeOut = FUTURE_TIMEOUT
+
   import akka.pattern.ask
-  implicit val dispatcher = context.dispatcher
+  private implicit val dispatcher = context.dispatcher
 
-  val startTime: TimeStamp = System.currentTimeMillis()
+  private val startTime: TimeStamp = System.currentTimeMillis()
 
   private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
   LOG.info(s"AppMaster[$appId] is launched by $username, app: $app xxxxxxxxxxxxxxxxx")
@@ -83,11 +84,12 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription)  extends Ap
   private var lastFailure = LastFailure(0L, null)
 
   private val appMasterBrief = ExecutorBrief(APPMASTER_DEFAULT_EXECUTOR_ID,
-    self.path.toString, Option(appContext.workerInfo).map(_.workerId).getOrElse(WorkerId.unspecified), "active")
+    self.path.toString,
+    Option(appContext.workerInfo).map(_.workerId).getOrElse(WorkerId.unspecified), "active")
 
   private val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig)
 
-  val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED)
+  private val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED)
 
   private val userDir = System.getProperty("user.dir")
   private val logFile = LogUtil.applicationLogDir(actorSystem.settings.config)
@@ -104,12 +106,15 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription)  extends Ap
   )
 
   private val historyMetricsService = if (metricsEnabled) {
-    // register jvm metrics
-    Metrics(context.system).register(new JvmMetricsSet(s"app${appId}.executor${APPMASTER_DEFAULT_EXECUTOR_ID}"))
+    // Registers jvm metrics
+    Metrics(context.system).register(new JvmMetricsSet(
+      s"app${appId}.executor${APPMASTER_DEFAULT_EXECUTOR_ID}"))
 
-    val historyMetricsService = context.actorOf(Props(new HistoryMetricsService(s"app$appId", getHistoryMetricsConfig)))
+    val historyMetricsService = context.actorOf(Props(new HistoryMetricsService(
+      s"app$appId", getHistoryMetricsConfig)))
 
-    val metricsReportService = context.actorOf(Props(new MetricsReporterService(Metrics(context.system))))
+    val metricsReportService = context.actorOf(Props(
+      new MetricsReporterService(Metrics(context.system))))
     historyMetricsService.tell(ReportMetrics, metricsReportService)
 
     Some(historyMetricsService)
@@ -129,13 +134,15 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription)  extends Ap
       jarScheduler, executorManager, clockService.get, self, app.name))))
   }
 
-  override def receive : Receive =
-      taskMessageHandler orElse
+  override def receive: Receive = {
+    taskMessageHandler orElse
       executorMessageHandler orElse
       recover orElse
       appMasterService orElse
       ActorUtil.defaultMsgHandler(self)
+  }
 
+  /** Handles messages from Tasks */
   def taskMessageHandler: Receive = {
     case clock: ClockEvent =>
       taskManager.foreach(_ forward clock)
@@ -143,7 +150,7 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription)  extends Ap
       taskManager.foreach(_ forward register)
     case unRegister: UnRegisterTask =>
       taskManager.foreach(_ forward unRegister)
-      // check whether this processor dead, if it is, then we should remove it from clockService.
+      // Checks whether this processor dead, if it is, then we should remove it from clockService.
       clockService.foreach(_ forward CheckProcessorDeath(unRegister.taskId.processorId))
     case replay: ReplayFromTimestampWindowTrailingEdge =>
       taskManager.foreach(_ forward replay)
@@ -163,6 +170,7 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription)  extends Ap
       clockService.foreach(_ forward GetCheckpointClock)
   }
 
+  /** Handles messages from Executors */
   def executorMessageHandler: Receive = {
     case register: RegisterExecutor =>
       executorManager forward register
@@ -170,6 +178,7 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription)  extends Ap
       historyMetricsService.foreach(_ forward ReportMetrics)
   }
 
+  /** Handles messages from AppMaster */
   def appMasterService: Receive = {
     case appMasterDataDetailRequest: AppMasterDataDetailRequest =>
       LOG.debug(s"AppMaster got AppMasterDataDetailRequest for $appId ")
@@ -179,14 +188,17 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription)  extends Ap
       val taskFuture = getTaskList
       val dagFuture = getDAG
 
-      val appMasterDataDetail = for {executors <- executorsFuture
+      val appMasterDataDetail = for {
+        executors <- executorsFuture
         clock <- clockFuture
         tasks <- taskFuture
         dag <- dagFuture
       } yield {
         val graph = dag.graph
 
-        val executorToTasks = tasks.tasks.groupBy(_._2).mapValues {_.keys.toList}
+        val executorToTasks = tasks.tasks.groupBy(_._2).mapValues {
+          _.keys.toList
+        }
 
         val processors = dag.processors.map { kv =>
           val processor = kv._2
@@ -195,7 +207,8 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription)  extends Ap
             (kv._1, TaskCount(kv._2.count(_.processorId == id)))
           }.filter(_._2.count != 0)
           (id,
-          ProcessorSummary(id, taskClass, parallelism, description, taskConf, life, tasks.keys.toList, tasks))
+            ProcessorSummary(id, taskClass, parallelism, description, taskConf, life,
+              tasks.keys.toList, tasks))
         }
 
         StreamAppMasterSummary(
@@ -211,7 +224,7 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription)  extends Ap
           logFile = logFile.getAbsolutePath,
           processors = processors,
           processorLevels = graph.vertexHierarchyLevelMap(),
-          dag = graph.mapEdge {(node1, edge, node2) =>
+          dag = graph.mapEdge { (node1, edge, node2) =>
             edge.partitionerFactory.name
           },
           executors = executors,
@@ -221,16 +234,16 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription)  extends Ap
 
       val client = sender()
 
-      appMasterDataDetail.map{appData =>
+      appMasterDataDetail.map { appData =>
         client ! appData
       }
-// TODO: WebSocket is buggy and disabled.
-//    case appMasterMetricsRequest: AppMasterMetricsRequest =>
-//      val client = sender()
-//      actorSystem.eventStream.subscribe(client, classOf[MetricType])
+    // TODO: WebSocket is buggy and disabled.
+    //    case appMasterMetricsRequest: AppMasterMetricsRequest =>
+    //      val client = sender()
+    //      actorSystem.eventStream.subscribe(client, classOf[MetricType])
     case query: QueryHistoryMetrics =>
       if (historyMetricsService.isEmpty) {
-        // return empty metrics so that we don't hang the UI
+        // Returns empty metrics so that we don't hang the UI
         sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem])
       } else {
         historyMetricsService.get forward query
@@ -241,34 +254,37 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription)  extends Ap
       dagManager forward replaceDAG
     case GetLastFailure(_) =>
       sender ! lastFailure
-    case  get@ GetExecutorSummary(executorId) =>
+    case get@GetExecutorSummary(executorId) =>
       val client = sender
       if (executorId == APPMASTER_DEFAULT_EXECUTOR_ID) {
         client ! appMasterExecutorSummary
       } else {
-        ActorUtil.askActor[Map[ExecutorId, ExecutorInfo]](executorManager, GetExecutorInfo).map { map =>
-          map.get(executorId).foreach { executor =>
-            executor.executor.tell(get, client)
+        ActorUtil.askActor[Map[ExecutorId, ExecutorInfo]](executorManager, GetExecutorInfo)
+          .map { map =>
+            map.get(executorId).foreach { executor =>
+              executor.executor.tell(get, client)
+            }
           }
-        }
       }
-    case query@ QueryExecutorConfig(executorId) =>
+    case query@QueryExecutorConfig(executorId) =>
       val client = sender
       if (executorId == -1) {
         val systemConfig = context.system.settings.config
         sender ! ExecutorConfig(ClusterConfig.filterOutDefaultConfig(systemConfig))
       } else {
-        ActorUtil.askActor[Map[ExecutorId, ExecutorInfo]](executorManager, GetExecutorInfo).map { map =>
-          map.get(executorId).foreach { executor =>
-            executor.executor.tell(query, client)
+        ActorUtil.askActor[Map[ExecutorId, ExecutorInfo]](executorManager, GetExecutorInfo)
+          .map { map =>
+            map.get(executorId).foreach { executor =>
+              executor.executor.tell(query, client)
+            }
           }
-        }
       }
-   }
+  }
 
+  /** Error handling */
   def recover: Receive = {
     case FailedToRecover(errorMsg) =>
-      if(context.children.toList.contains(sender())){
+      if (context.children.toList.contains(sender())) {
         LOG.error(errorMsg)
         masterProxy ! ShutdownApplication(appId)
       }
@@ -288,14 +304,15 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription)  extends Ap
   }
 
   private def executorBrief: Future[List[ExecutorBrief]] = {
-    ActorUtil.askActor[Map[ExecutorId, ExecutorInfo]](executorManager, GetExecutorInfo).map { infos =>
-      infos.values.map { info =>
-        ExecutorBrief(info.executorId,
-          info.executor.path.toSerializationFormat,
-          info.worker.workerId,
-          "active")
-      }.toList :+ appMasterBrief
-    }
+    ActorUtil.askActor[Map[ExecutorId, ExecutorInfo]](executorManager, GetExecutorInfo)
+      .map { infos =>
+        infos.values.map { info =>
+          ExecutorBrief(info.executorId,
+            info.executor.path.toSerializationFormat,
+            info.worker.workerId,
+            "active")
+        }.toList :+ appMasterBrief
+      }
   }
 
   private def getTaskList: Future[TaskList] = {
@@ -312,10 +329,11 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription)  extends Ap
   }
 
   private def getUpdatedDAG(): DAG = {
-    val dag = DAG(userConfig.getValue[Graph[ProcessorDescription, PartitionerDescription]](StreamApplication.DAG).get)
-    val updated = dag.processors.map{ idAndProcessor =>
+    val dag = DAG(userConfig.getValue[Graph[ProcessorDescription,
+      PartitionerDescription]](StreamApplication.DAG).get)
+    val updated = dag.processors.map { idAndProcessor =>
       val (id, oldProcessor) = idAndProcessor
-      val newProcessor = if(oldProcessor.jar == null) {
+      val newProcessor = if (oldProcessor.jar == null) {
         oldProcessor.copy(jar = appContext.appJar.getOrElse(null))
       } else {
         oldProcessor
@@ -327,14 +345,18 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription)  extends Ap
 }
 
 object AppMaster {
+
+  /** Master node doesn't return resource in time */
   case object AllocateResourceTimeOut
 
+  /** Query task ActorRef by providing the taskId */
   case class LookupTaskActorRef(taskId: TaskId)
 
   case class TaskActorRef(task: ActorRef)
 
   class ServiceNotAvailableException(reason: String) extends Exception(reason)
 
-  case class ExecutorBrief(executorId: ExecutorId, executor: String, workerId: WorkerId, status: String)
+  case class ExecutorBrief(
+      executorId: ExecutorId, executor: String, workerId: WorkerId, status: String)
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala
index 67fd592..bd18bdc 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,45 +21,47 @@ package io.gearpump.streaming.appmaster
 import java.util
 import java.util.Date
 import java.util.concurrent.TimeUnit
+import scala.concurrent.Future
+import scala.concurrent.duration.FiniteDuration
+import scala.language.implicitConversions
 
 import akka.actor.{Actor, Cancellable, Stash}
+import org.slf4j.Logger
+
+import io.gearpump.TimeStamp
+import io.gearpump.cluster.ClientToMaster.GetStallingTasks
 import io.gearpump.google.common.primitives.Longs
-import io.gearpump.streaming.task._
+import io.gearpump.streaming.AppMasterToMaster.StallingTasks
 import io.gearpump.streaming._
+import io.gearpump.streaming.appmaster.ClockService.HealthChecker.ClockValue
+import io.gearpump.streaming.appmaster.ClockService._
 import io.gearpump.streaming.storage.AppDataStore
-import io.gearpump.TimeStamp
-import io.gearpump.cluster.ClientToMaster.GetStallingTasks
-import AppMasterToMaster.StallingTasks
-import ClockService.HealthChecker.ClockValue
-import ClockService._
+import io.gearpump.streaming.task._
 import io.gearpump.util.LogUtil
-import org.slf4j.Logger
-
-import scala.concurrent.Future
-import scala.concurrent.duration.FiniteDuration
-import scala.language.implicitConversions
 
 /**
- * The clockService will maintain a global view of message timestamp in the application
+ * Maintains a global view of message timestamp in the application
  */
-class ClockService(private var dag : DAG, store: AppDataStore) extends Actor with Stash {
+class ClockService(private var dag: DAG, store: AppDataStore) extends Actor with Stash {
   private val LOG: Logger = LogUtil.getLogger(getClass)
 
   import context.dispatcher
 
   private val healthChecker = new HealthChecker(stallingThresholdSeconds = 60)
-  private var healthCheckScheduler : Cancellable = null
-  private var snapshotScheduler : Cancellable = null
+  private var healthCheckScheduler: Cancellable = null
+  private var snapshotScheduler: Cancellable = null
 
-  override def receive = null
+  override def receive: Receive = null
 
-  override def preStart() : Unit = {
+  override def preStart(): Unit = {
     LOG.info("Initializing Clock service, get snapshotted StartClock ....")
     store.get(START_CLOCK).asInstanceOf[Future[TimeStamp]].map { clock =>
       val startClock = Option(clock).getOrElse(0L)
 
       minCheckpointClock = Some(startClock)
 
+      // Recover the application by restarting from last persisted startClock.
+      // Only messge after startClock will be replayed.
       self ! StoredStartClock(startClock)
       LOG.info(s"Start Clock Retrieved, starting ClockService, startClock: $startClock")
     }
@@ -67,12 +69,15 @@ class ClockService(private var dag : DAG, store: AppDataStore) extends Actor wit
     context.become(waitForStartClock)
   }
 
-  override def postStop() : Unit = {
+  override def postStop(): Unit = {
     Option(healthCheckScheduler).map(_.cancel)
     Option(snapshotScheduler).map(_.cancel)
   }
 
-  var clocks = Map.empty[ProcessorId, ProcessorClock]
+  // Keep track of clock value of all processors.
+  private var clocks = Map.empty[ProcessorId, ProcessorClock]
+
+  // Each process can have multiple upstream processors. This keep track of the upstream clocks.
   private var upstreamClocks = Map.empty[ProcessorId, Array[ProcessorClock]]
 
   // We use Array instead of List for Performance consideration
@@ -106,12 +111,12 @@ class ClockService(private var dag : DAG, store: AppDataStore) extends Actor wit
   private def recoverDag(dag: DAG, startClock: TimeStamp): Unit = {
     this.clocks = dag.processors.filter(startClock < _._2.life.death).
       map { pair =>
-      val (processorId, processor) = pair
-      val parallelism = processor.parallelism
-      val clock = new ProcessorClock(processorId, processor.life, parallelism)
-      clock.init(startClock)
-      (processorId, clock)
-    }
+        val (processorId, processor) = pair
+        val parallelism = processor.parallelism
+        val clock = new ProcessorClock(processorId, processor.life, parallelism)
+        clock.init(startClock)
+        (processorId, clock)
+      }
 
     this.upstreamClocks = clocks.map { pair =>
       val (processorId, processor) = pair
@@ -150,7 +155,7 @@ class ClockService(private var dag : DAG, store: AppDataStore) extends Actor wit
       (processorId, upstreamClocks.toArray)
     }
 
-    // init the clock of all processors.
+    // Inits the clock of all processors.
     newClocks.map { pair =>
       val (processorId, processorClock) = pair
       val upstreamClock = getUpStreamMinClock(processorId)
@@ -174,11 +179,12 @@ class ClockService(private var dag : DAG, store: AppDataStore) extends Actor wit
 
       import context.dispatcher
 
-      //period report current clock
-      healthCheckScheduler = context.system.scheduler.schedule(new FiniteDuration(5, TimeUnit.SECONDS),
+      // Period report current clock
+      healthCheckScheduler = context.system.scheduler.schedule(
+        new FiniteDuration(5, TimeUnit.SECONDS),
         new FiniteDuration(60, TimeUnit.SECONDS), self, HealthCheck)
 
-      //period snpashot latest min startclock to external storage
+      // Period snpashot latest min startclock to external storage
       snapshotScheduler = context.system.scheduler.schedule(new FiniteDuration(5, TimeUnit.SECONDS),
         new FiniteDuration(5, TimeUnit.SECONDS), self, SnapshotStartClock)
 
@@ -206,7 +212,7 @@ class ClockService(private var dag : DAG, store: AppDataStore) extends Actor wit
     case GetUpstreamMinClock(task) =>
       sender ! UpstreamMinClock(getUpStreamMinClock(task.processorId))
 
-    case update@ UpdateClock(task, clock) =>
+    case update@UpdateClock(task, clock) =>
       val upstreamMinClock = getUpStreamMinClock(task.processorId)
 
       val processorClock = clocks.get(task.processorId)
@@ -233,7 +239,8 @@ class ClockService(private var dag : DAG, store: AppDataStore) extends Actor wit
           LOG.info(s"Removing $processorId from clock service...")
           removeProcessor(processorId)
         } else {
-          LOG.info(s"Unsuccessfully in removing $processorId from clock service..., min: ${processorClock.get.min}, life: $life")
+          LOG.info(s"Unsuccessfully in removing $processorId from clock service...," +
+            s" min: ${processorClock.get.min}, life: $life")
         }
       }
     case HealthCheck =>
@@ -253,15 +260,15 @@ class ClockService(private var dag : DAG, store: AppDataStore) extends Actor wit
 
     case ChangeToNewDAG(dag) =>
       if (dag.version > this.dag.version) {
-        // transit to a new dag version
+        // Transits to a new dag version
         this.dag = dag
         dynamicDAG(dag, getStartClock)
       } else {
-        // restart current dag.
+        // Restarts current dag.
         recoverDag(dag, getStartClock)
       }
-     LOG.info(s"Change to new DAG(dag = ${dag.version}), send back ChangeToNewDAGSuccess")
-      sender ! ChangeToNewDAGSuccess(clocks.map{ pair =>
+      LOG.info(s"Change to new DAG(dag = ${dag.version}), send back ChangeToNewDAGSuccess")
+      sender ! ChangeToNewDAGSuccess(clocks.map { pair =>
         val (id, clock) = pair
         (id, clock.min)
       })
@@ -279,7 +286,7 @@ class ClockService(private var dag : DAG, store: AppDataStore) extends Actor wit
 
     upstreamClocks = upstreamClocks - processorId
 
-    // remove dead processor from checkpoints.
+    // Removes dead processor from checkpoints.
     checkpointClocks = checkpointClocks.filter { kv =>
       val (taskId, processor) = kv
       taskId.processorId != processorId
@@ -290,12 +297,13 @@ class ClockService(private var dag : DAG, store: AppDataStore) extends Actor wit
     ProcessorClocks.minClock(processorClocks)
   }
 
-  def selfCheck() : Unit = {
+  def selfCheck(): Unit = {
     val minTimestamp = minClock
 
     if (Long.MaxValue == minTimestamp) {
       processorClocks.foreach { clock =>
-        LOG.info(s"Processor ${clock.processorId} Clock: min: ${clock.min}, taskClocks: "+ clock.taskClocks.mkString(","))
+        LOG.info(s"Processor ${clock.processorId} Clock: min: ${clock.min}, " +
+          s"taskClocks: " + clock.taskClocks.mkString(","))
       }
     }
 
@@ -306,7 +314,7 @@ class ClockService(private var dag : DAG, store: AppDataStore) extends Actor wit
     minCheckpointClock.getOrElse(minClock)
   }
 
-  private def snapshotStartClock() : Unit = {
+  private def snapshotStartClock(): Unit = {
     store.put(START_CLOCK, getStartClock)
   }
 
@@ -329,10 +337,11 @@ object ClockService {
   case object HealthCheck
 
   class ProcessorClock(val processorId: ProcessorId, val life: LifeTime, val parallism: Int,
-    private var _min: TimeStamp = 0L, private var _taskClocks: Array[TimeStamp] = null) {
-
+      private var _min: TimeStamp = 0L, private var _taskClocks: Array[TimeStamp] = null) {
 
-    def copy(life: LifeTime): ProcessorClock = new ProcessorClock(processorId, life, parallism, _min, _taskClocks)
+    def copy(life: LifeTime): ProcessorClock = {
+      new ProcessorClock(processorId, life, parallism, _min, _taskClocks)
+    }
 
     def min: TimeStamp = _min
     def taskClocks: Array[TimeStamp] = _taskClocks
@@ -362,17 +371,22 @@ object ClockService {
     private val LOG: Logger = LogUtil.getLogger(getClass)
 
     private var minClock: ClockValue = null
-    private val stallingThresholdMilliseconds = stallingThresholdSeconds * 1000 // 60 seconds
+    private val stallingThresholdMilliseconds = stallingThresholdSeconds * 1000
+    // 60 seconds
     private var stallingTasks = Array.empty[TaskId]
 
-    def check(currentMinClock: TimeStamp, processorClocks: Map[ProcessorId, ProcessorClock], dag: DAG, now: TimeStamp): Unit = {
+    /** Check for stalling tasks */
+    def check(
+        currentMinClock: TimeStamp, processorClocks: Map[ProcessorId, ProcessorClock],
+        dag: DAG, now: TimeStamp): Unit = {
       var isClockStalling = false
       if (null == minClock || currentMinClock > minClock.appClock) {
         minClock = ClockValue(systemClock = now, appClock = currentMinClock)
       } else {
-        //clock not advancing
+        // Clock not advancing
         if (now > minClock.systemClock + stallingThresholdMilliseconds) {
-          LOG.warn(s"Clock has not advanced for ${(now - minClock.systemClock)/1000} seconds since ${minClock.prettyPrint}...")
+          LOG.warn(s"Clock has not advanced for ${(now - minClock.systemClock) / 1000} seconds " +
+            s"since ${minClock.prettyPrint}...")
           isClockStalling = true
         }
       }
@@ -387,7 +401,7 @@ object ClockService {
           }
         }
 
-        processorId.foreach {processorId =>
+        processorId.foreach { processorId =>
           val processorClock = processorClocks(processorId)
           val taskClocks = processorClock.taskClocks
           stallingTasks = taskClocks.zipWithIndex.filter(_._1 == minClock.appClock).
@@ -414,10 +428,11 @@ object ClockService {
 
   object ProcessorClocks {
 
+    // Get the Min clock of all processors
     def minClock(clock: Array[ProcessorClock]): TimeStamp = {
       var i = 0
       var min = if (clock.length == 0) 0L else clock(0).min
-      while(i < clock.length) {
+      while (i < clock.length) {
         min = Math.min(min, clock(i).min)
         i += 1
       }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/appmaster/DagManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/DagManager.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/DagManager.scala
index 7244f95..f18b387 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/DagManager.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/DagManager.scala
@@ -18,44 +18,48 @@
 
 package io.gearpump.streaming.appmaster
 
-import akka.actor.{ActorRef, Actor, Stash}
-import io.gearpump.streaming._
-import io.gearpump.streaming.task.Subscriber
-import io.gearpump.streaming.storage.AppDataStore
-import io.gearpump.cluster.UserConfig
-import io.gearpump.partitioner.PartitionerDescription
-import DagManager._
-import io.gearpump.util.{LogUtil, Graph}
+import scala.concurrent.Future
+
+import akka.actor.{Actor, ActorRef, Stash}
 import org.slf4j.Logger
 
-import scala.concurrent.Future
+import io.gearpump.cluster.UserConfig
+import io.gearpump.partitioner.PartitionerDescription
+import io.gearpump.streaming._
+import io.gearpump.streaming.appmaster.DagManager._
+import io.gearpump.streaming.storage.AppDataStore
+import io.gearpump.streaming.task.Subscriber
+import io.gearpump.util.{Graph, LogUtil}
 
 /**
- * Will handle dag modification and other stuff related with DAG
-  */
-
+ * Handles dag modification and other stuff related with DAG
+ *
+ * DagManager maintains multiple version of DAGs. For each version, the DAG is immutable.
+ * For operations like modifying a processor, it will create a new version of DAG.
+ */
 class DagManager(appId: Int, userConfig: UserConfig, store: AppDataStore, dag: Option[DAG])
-    extends Actor with Stash {
+  extends Actor with Stash {
+
   import context.dispatcher
   private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
   private val NOT_INITIALIZED = -1
 
   private var dags = List.empty[DAG]
   private var maxProcessorId = -1
-  implicit val system = context.system
+  private implicit val system = context.system
 
   private var watchers = List.empty[ActorRef]
 
   override def receive: Receive = null
 
-  override def preStart() : Unit = {
+  override def preStart(): Unit = {
     LOG.info("Initializing Dag Service, get stored Dag ....")
     store.get(StreamApplication.DAG).asInstanceOf[Future[DAG]].map { storedDag =>
       if (storedDag != null) {
         dags :+= storedDag
       } else {
         dags :+= dag.getOrElse(DAG(userConfig.getValue[Graph[ProcessorDescription,
-            PartitionerDescription]](StreamApplication.DAG).get))
+          PartitionerDescription]](StreamApplication.DAG).get))
       }
       maxProcessorId = {
         val keys = dags.head.processors.keys
@@ -91,20 +95,25 @@ class DagManager(appId: Int, userConfig: UserConfig, store: AppDataStore, dag: O
 
   def dagService: Receive = {
     case GetLatestDAG =>
+      // Get the latest version of DAG.
       sender ! LatestDAG(dags.last)
     case GetTaskLaunchData(version, processorId, context) =>
+      // Task information like Processor class, downstream subscriber processors and etc.
       dags.find(_.version == version).foreach { dag =>
         LOG.info(s"Get task launcher data for processor: $processorId, dagVersion: $version")
         sender ! taskLaunchData(dag, processorId, context)
       }
     case ReplaceProcessor(oldProcessorId, inputNewProcessor) =>
+      // Replace a processor with new implementation. The upstream processors and downstream
+      // processors are NOT changed.
       var newProcessor = inputNewProcessor.copy(id = nextProcessorId)
-      if (inputNewProcessor.jar == null){
+      if (inputNewProcessor.jar == null) {
         val oldJar = dags.last.processors.get(oldProcessorId).get
         newProcessor = newProcessor.copy(jar = oldJar.jar)
       }
       if (dags.length > 1) {
-        sender !  DAGOperationFailed("We are in the process of handling previous dynamic dag change")
+        sender ! DAGOperationFailed(
+          "We are in the process of handling previous dynamic dag change")
       } else {
         val oldDAG = dags.last
         val newVersion = oldDAG.version + 1
@@ -118,21 +127,25 @@ class DagManager(appId: Int, userConfig: UserConfig, store: AppDataStore, dag: O
       }
 
     case WatchChange(watcher) =>
+      // Checks whether there are modifications for this DAG.
       if (!this.watchers.contains(watcher)) {
         this.watchers :+= watcher
       }
 
     case NewDAGDeployed(dagVersion) =>
-      // means the new DAG version has been successfully deployed.
-      // remove obsolete versions.
+      // Means dynamic Dag transition completed, and the new DAG version has been successfully
+      // deployed. The obsolete dag versions will be removed.
       if (dagVersion != NOT_INITIALIZED) {
         dags = dags.filter(_.version == dagVersion)
         store.put(StreamApplication.DAG, dags.last)
       }
   }
 
-  private def replaceDAG(dag: DAG, oldProcessorId: ProcessorId, newProcessor: ProcessorDescription, newVersion: Int): DAG = {
-    val oldProcessorLife = LifeTime(dag.processors(oldProcessorId).life.birth, newProcessor.life.birth)
+  private def replaceDAG(
+      dag: DAG, oldProcessorId: ProcessorId, newProcessor: ProcessorDescription, newVersion: Int)
+    : DAG = {
+    val oldProcessorLife = LifeTime(dag.processors(oldProcessorId).life.birth,
+      newProcessor.life.birth)
 
     val newProcessorMap = dag.processors ++
       Map(oldProcessorId -> dag.processors(oldProcessorId).copy(life = oldProcessorLife),
@@ -153,11 +166,13 @@ object DagManager {
   case class LatestDAG(dag: DAG)
 
   case class GetTaskLaunchData(dagVersion: Int, processorId: Int, context: AnyRef = null)
-  case class TaskLaunchData(processorDescription : ProcessorDescription, subscribers: List[Subscriber], context: AnyRef = null)
+  case class TaskLaunchData(processorDescription : ProcessorDescription,
+      subscribers: List[Subscriber], context: AnyRef = null)
 
   sealed trait DAGOperation
 
-  case class ReplaceProcessor(oldProcessorId: ProcessorId, newProcessorDescription: ProcessorDescription) extends DAGOperation
+  case class ReplaceProcessor(oldProcessorId: ProcessorId,
+      newProcessorDescription: ProcessorDescription) extends DAGOperation
 
   sealed trait DAGOperationResult
   case object DAGOperationSuccess extends DAGOperationResult

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/appmaster/ExecutorManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/ExecutorManager.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/ExecutorManager.scala
index ebe6c11..ab99af5 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/ExecutorManager.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/ExecutorManager.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,34 +18,32 @@
 
 package io.gearpump.streaming.appmaster
 
+import scala.concurrent.duration._
+import scala.util.{Failure, Try}
+
 import akka.actor.SupervisorStrategy.Stop
 import akka.actor._
 import akka.remote.RemoteScope
 import com.typesafe.config.Config
-import io.gearpump.WorkerId
+import org.apache.commons.lang.exception.ExceptionUtils
+
 import io.gearpump.cluster.AppMasterToWorker.ChangeExecutorResource
 import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, ExecutorSystemStarted, StartExecutorSystemTimeout, StartExecutorSystems}
 import io.gearpump.cluster.appmaster.WorkerInfo
 import io.gearpump.cluster.scheduler.{Resource, ResourceRequest}
+import io.gearpump.cluster.worker.WorkerId
 import io.gearpump.cluster.{AppJar, AppMasterContext, ExecutorContext, UserConfig}
 import io.gearpump.streaming.ExecutorId
 import io.gearpump.streaming.ExecutorToAppMaster.RegisterExecutor
 import io.gearpump.streaming.appmaster.ExecutorManager._
 import io.gearpump.streaming.executor.Executor
 import io.gearpump.util.{LogUtil, Util}
-import org.apache.commons.lang.exception.ExceptionUtils
-
-import scala.concurrent.duration._
-import scala.util.Try
 
 /**
- * ExecutorManager manage all executors.
+ * ExecutorManager manage the start and stop of all executors.
  *
  * ExecutorManager will launch Executor when asked. It hide the details like starting
- * a new ExecutorSystem from user.
- *
- * Please use ExecutorManager.props() to construct this actor
- *
+ * a new ExecutorSystem from user. Please use ExecutorManager.props() to construct this actor
  */
 private[appmaster] class ExecutorManager(
     userConfig: UserConfig,
@@ -60,10 +58,11 @@ private[appmaster] class ExecutorManager(
   import appContext.{appId, masterProxy, username}
 
   private var taskManager: ActorRef = null
-  implicit val actorSystem = context.system
+
+  private implicit val actorSystem = context.system
   private val systemConfig = context.system.settings.config
 
-  private var executors =  Map.empty[Int, ExecutorInfo]
+  private var executors = Map.empty[Int, ExecutorInfo]
 
   def receive: Receive = waitForTaskManager
 
@@ -73,18 +72,26 @@ private[appmaster] class ExecutorManager(
       context.become(service orElse terminationWatch)
   }
 
-  override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
+  // If something wrong on executor, ExecutorManager will stop the current executor,
+  // and wait for AppMaster to start a new executor.
+  override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10,
+    withinTimeRange = 1.minute) {
     case ex: Throwable =>
       val executorId = Try(sender.path.name.toInt)
       executorId match {
         case scala.util.Success(id) => {
           executors -= id
-          LOG.error(s"Executor $id throws exception, stop it...\n" + ExceptionUtils.getStackTrace(ex))
+          LOG.error(s"Executor $id throws exception, stop it...\n" +
+            ExceptionUtils.getStackTrace(ex))
+        }
+        case Failure(ex) => {
+          LOG.error(s"Sender ${sender.path} is dead, but seems it is not an executor...")
         }
       }
       Stop
   }
 
+  // Responds to outside queries
   def service: Receive = {
     case StartExecutors(resources, jar) =>
       masterProxy ! StartExecutorSystems(resources, getExecutorJvmConfig(Some(jar)))
@@ -92,30 +99,31 @@ private[appmaster] class ExecutorManager(
       import executorSystem.{address, executorSystemId, resource => executorResource, worker}
 
       val executorId = executorSystemId
-      val executorContext = ExecutorContext(executorId, worker, appId, appName, appMaster = context.parent, executorResource)
+      val executorContext = ExecutorContext(executorId, worker, appId, appName,
+        appMaster = context.parent, executorResource)
       executors += executorId -> ExecutorInfo(executorId, null, worker, boundedJar)
 
-      //start executor
-      val executor = context.actorOf(executorFactory(executorContext, userConfig, address, executorId),
-        executorId.toString)
+      // Starts executor
+      val executor = context.actorOf(executorFactory(executorContext, userConfig,
+        address, executorId), executorId.toString)
       executorSystem.bindLifeCycleWith(executor)
     case StartExecutorSystemTimeout =>
       taskManager ! StartExecutorsTimeOut
 
     case RegisterExecutor(executor, executorId, resource, worker) =>
       LOG.info(s"executor $executorId has been launched")
-      //watch for executor termination
+      // Watches for executor termination
       context.watch(executor)
       val executorInfo = executors.get(executorId).get
       executors += executorId -> executorInfo.copy(executor = executor)
       taskManager ! ExecutorStarted(executorId, resource, worker.workerId, executorInfo.boundedJar)
 
-    // broadcast message to all executors
+    // Broadcasts message to all executors
     case BroadCast(msg) =>
       LOG.info(s"Broadcast ${msg.getClass.getSimpleName} to all executors")
-      context.children.foreach(_ forward  msg)
+      context.children.foreach(_ forward msg)
 
-    // unicast message to single executor
+    // Unicasts message to single executor
     case UniCast(executorId, msg) =>
       LOG.info(s"Unicast ${msg.getClass.getSimpleName} to executor $executorId")
       val executor = executors.get(executorId)
@@ -124,13 +132,14 @@ private[appmaster] class ExecutorManager(
     case GetExecutorInfo =>
       sender ! executors
 
-    // update resource usage, so that reclaim unused resource.
+    // Tells Executor manager resources that are occupied. The Executor Manager can use this
+    // information to tell worker to reclaim un-used resources
     case ExecutorResourceUsageSummary(resources) =>
       executors.foreach { pair =>
         val (executorId, executor) = pair
         val resource = resources.get(executorId)
         val worker = executor.worker.ref
-        // notify the worker the actual resource used by this application.
+        // Notifies the worker the actual resource used by this application.
         resource match {
           case Some(resource) =>
             worker ! ChangeExecutorResource(appId, executorId, resource)
@@ -138,9 +147,9 @@ private[appmaster] class ExecutorManager(
             worker ! ChangeExecutorResource(appId, executorId, Resource(0))
         }
       }
-    }
+  }
 
-  def terminationWatch : Receive = {
+  def terminationWatch: Receive = {
     case Terminated(actor) =>
       val executorId = Try(actor.path.name.toInt)
       executorId match {
@@ -149,14 +158,17 @@ private[appmaster] class ExecutorManager(
           LOG.error(s"Executor $id is down")
           taskManager ! ExecutorStopped(id)
         }
-        case scala.util.Failure(ex) => LOG.error(s"failed to get the executor Id from path string ${actor.path}" , ex)
+        case scala.util.Failure(ex) =>
+          LOG.error(s"failed to get the executor Id from path string ${actor.path}", ex)
       }
   }
 
   private def getExecutorJvmConfig(jar: Option[AppJar]): ExecutorSystemJvmConfig = {
     val executorAkkaConfig = clusterConfig
     val jvmSetting = Util.resolveJvmSetting(executorAkkaConfig.withFallback(systemConfig)).executor
-    ExecutorSystemJvmConfig(jvmSetting.classPath, jvmSetting.vmargs, jar, username, executorAkkaConfig)
+
+    ExecutorSystemJvmConfig(jvmSetting.classPath, jvmSetting.vmargs, jar,
+      username, executorAkkaConfig)
   }
 }
 
@@ -168,26 +180,30 @@ private[appmaster] object ExecutorManager {
 
   case object GetExecutorInfo
 
-  case class ExecutorStarted(executorId: Int, resource: Resource, workerId: WorkerId, boundedJar: Option[AppJar])
+  case class ExecutorStarted(
+      executorId: Int, resource: Resource, workerId: WorkerId, boundedJar: Option[AppJar])
   case class ExecutorStopped(executorId: Int)
 
   case class SetTaskManager(taskManager: ActorRef)
 
   case object StartExecutorsTimeOut
 
-  def props(userConfig: UserConfig, appContext: AppMasterContext, clusterConfig: Config, appName: String): Props = {
+  def props(
+      userConfig: UserConfig, appContext: AppMasterContext, clusterConfig: Config, appName: String)
+    : Props = {
     val executorFactory =
-        (executorContext: ExecutorContext,
-         userConfig: UserConfig,
-         address: Address,
-         executorId: ExecutorId) =>
-      Props(classOf[Executor], executorContext, userConfig)
-        .withDeploy(Deploy(scope = RemoteScope(address)))
+      (executorContext: ExecutorContext,
+        userConfig: UserConfig,
+        address: Address,
+        executorId: ExecutorId) =>
+        Props(classOf[Executor], executorContext, userConfig)
+          .withDeploy(Deploy(scope = RemoteScope(address)))
 
     Props(new ExecutorManager(userConfig, appContext, executorFactory, clusterConfig, appName))
   }
 
   case class ExecutorResourceUsageSummary(resources: Map[ExecutorId, Resource])
 
-  case class ExecutorInfo(executorId: ExecutorId, executor: ActorRef, worker: WorkerInfo, boundedJar: Option[AppJar])
+  case class ExecutorInfo(
+      executorId: ExecutorId, executor: ActorRef, worker: WorkerInfo, boundedJar: Option[AppJar])
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/appmaster/JarScheduler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/JarScheduler.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/JarScheduler.scala
index e769a0d..a2a9c74 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/JarScheduler.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/JarScheduler.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,51 +17,69 @@
  */
 package io.gearpump.streaming.appmaster
 
+import scala.concurrent.Future
+
 import akka.actor._
+import akka.pattern.ask
 import com.typesafe.config.Config
-import io.gearpump.{WorkerId, TimeStamp}
-import io.gearpump.streaming.task.{StartClock, TaskId}
-import io.gearpump.streaming.{ProcessorDescription, DAG}
+
+import io.gearpump.TimeStamp
 import io.gearpump.cluster.AppJar
 import io.gearpump.cluster.scheduler.{Resource, ResourceRequest}
+import io.gearpump.cluster.worker.WorkerId
 import io.gearpump.partitioner.PartitionerDescription
 import io.gearpump.streaming.appmaster.JarScheduler._
-import io.gearpump.util.{LogUtil, Constants, Graph}
-import akka.pattern.ask
-import scala.concurrent.Future
+import io.gearpump.streaming.task.TaskId
+import io.gearpump.streaming.{DAG, ProcessorDescription}
+import io.gearpump.util.{Constants, Graph, LogUtil}
 
 /**
+ * Different processors of the stream application can use different jars. JarScheduler is the
+ * scheduler for different jars.
  *
- * With JarScheduler, we allows a DAG to be partitioned into several
- * parts, with each part use its own jar file.
+ * For a DAG of multiple processors, each processor can have its own jar. Tasks of same jar
+ * is scheduled by TaskScheduler, and TaskSchedulers are scheduled by JarScheduler.
  *
+ * In runtime, the implementation is delegated to actor JarSchedulerImpl
  */
-class JarScheduler(appId : Int, appName: String, config: Config, factory: ActorRefFactory) {
+class JarScheduler(appId: Int, appName: String, config: Config, factory: ActorRefFactory) {
   private val actor: ActorRef = factory.actorOf(Props(new JarSchedulerImpl(appId, appName, config)))
   private implicit val dispatcher = factory.dispatcher
   private implicit val timeout = Constants.FUTURE_TIMEOUT
 
+  /** Set the current DAG version active */
   def setDag(dag: DAG, startClock: Future[TimeStamp]): Unit = {
     actor ! TransitToNewDag
-    startClock.map {start =>
+    startClock.map { start =>
       actor ! NewDag(dag, start)
     }
   }
 
-  def getRequestDetails(): Future[Array[ResourceRequestDetail]] = {
-    (actor ? GetRequestDetails).asInstanceOf[Future[Array[ResourceRequestDetail]]]
+  /** AppMaster ask JarScheduler about how many resource it wants */
+  def getResourceRequestDetails(): Future[Array[ResourceRequestDetail]] = {
+    (actor ? GetResourceRequestDetails).asInstanceOf[Future[Array[ResourceRequestDetail]]]
   }
 
-  def scheduleTask(appJar: AppJar, workerId: WorkerId, executorId: Int, resource: Resource): Future[List[TaskId]] = {
-    (actor ? ScheduleTask(appJar, workerId, executorId, resource)).asInstanceOf[Future[List[TaskId]]]
+  /**
+   * AppMaster has resource allocated, and ask the jar scheduler to schedule tasks
+   * for this executor.
+   */
+  def scheduleTask(appJar: AppJar, workerId: WorkerId, executorId: Int, resource: Resource)
+  : Future[List[TaskId]] = {
+    (actor ? ScheduleTask(appJar, workerId, executorId, resource))
+      .asInstanceOf[Future[List[TaskId]]]
   }
 
+  /**
+   * Some executor JVM process is dead. AppMaster asks jar scheduler to re-schedule the impacted
+   * tasks.
+   */
   def executorFailed(executorId: Int): Future[Option[ResourceRequestDetail]] = {
     (actor ? ExecutorFailed(executorId)).asInstanceOf[Future[Option[ResourceRequestDetail]]]
   }
 }
 
-object JarScheduler{
+object JarScheduler {
 
   case class ResourceRequestDetail(jar: AppJar, requests: Array[ResourceRequest])
 
@@ -69,14 +87,24 @@ object JarScheduler{
 
   case object TransitToNewDag
 
-  case object GetRequestDetails
+  case object GetResourceRequestDetails
 
+  /**
+   * Schedule tasks for one appJar.
+   *
+   * @param appJar Application jar.
+   * @param workerId Worker machine Id.
+   * @param executorId Executor Id.
+   * @param resource Slots that are available.
+   */
   case class ScheduleTask(appJar: AppJar, workerId: WorkerId, executorId: Int, resource: Resource)
 
+  /** Some executor JVM is dead, try to recover tasks that are located on failed executor */
   case class ExecutorFailed(executorId: Int)
 
-  class JarSchedulerImpl(appId : Int, appName: String, config: Config) extends Actor with Stash {
+  class JarSchedulerImpl(appId: Int, appName: String, config: Config) extends Actor with Stash {
 
+    // Each TaskScheduler maps to a jar.
     private var taskSchedulers = Map.empty[AppJar, TaskScheduler]
 
     private val LOG = LogUtil.getLogger(getClass)
@@ -84,26 +112,29 @@ object JarScheduler{
     def receive: Receive = waitForNewDag
 
     def waitForNewDag: Receive = {
-      case TransitToNewDag => // continue current state
+      case TransitToNewDag => // Continue current state
       case NewDag(dag, startTime) =>
 
         LOG.info(s"Init JarScheduler, dag version: ${dag.version}, startTime: $startTime")
 
         val processors = dag.processors.values.groupBy(_.jar)
+
         taskSchedulers = processors.map { jarAndProcessors =>
           val (jar, processors) = jarAndProcessors
-          //Construct the sub DAG
-          val graph = Graph.empty[ProcessorDescription, PartitionerDescription]
-          processors.foreach{processor =>
+          // Construct the sub DAG, each sub DAG maps to a separate jar.
+          val subGraph = Graph.empty[ProcessorDescription, PartitionerDescription]
+          processors.foreach { processor =>
             if (startTime < processor.life.death) {
-              graph.addVertex(processor)
+              subGraph.addVertex(processor)
             }
           }
-          val subDag = DAG(graph)
-          val taskScheduler = taskSchedulers.getOrElse(jar, new TaskSchedulerImpl(appId, appName, config))
+          val subDagForSingleJar = DAG(subGraph)
 
-          LOG.info(s"Set DAG for TaskScheduler, count: " + subDag.processors.size)
-          taskScheduler.setDAG(subDag)
+          val taskScheduler = taskSchedulers
+            .getOrElse(jar, new TaskSchedulerImpl(appId, appName, config))
+
+          LOG.info(s"Set DAG for TaskScheduler, count: " + subDagForSingleJar.processors.size)
+          taskScheduler.setDAG(subDagForSingleJar)
           jar -> taskScheduler
         }
         unstashAll()
@@ -113,15 +144,20 @@ object JarScheduler{
     }
 
     def ready: Receive = {
+      // Notifies there is a new DAG coming.
       case TransitToNewDag =>
         context.become(waitForNewDag)
-      case GetRequestDetails =>
+
+      case GetResourceRequestDetails =>
+
+        // Asks each TaskScheduler (Each for one jar) the resource requests.
         val result: Array[ResourceRequestDetail] = taskSchedulers.map { jarAndScheduler =>
           val (jar, scheduler) = jarAndScheduler
           ResourceRequestDetail(jar, scheduler.getResourceRequests())
         }.toArray
         LOG.info(s"GetRequestDetails " + result.mkString(";"))
         sender ! result
+
       case ScheduleTask(appJar, workerId, executorId, resource) =>
         val result: List[TaskId] = taskSchedulers.get(appJar).map { scheduler =>
           scheduler.schedule(workerId, executorId, resource)
@@ -130,9 +166,9 @@ object JarScheduler{
         sender ! result
       case ExecutorFailed(executorId) =>
         val result: Option[ResourceRequestDetail] = taskSchedulers.
-          find(_._2.scheduledTasks(executorId).nonEmpty).map{ jarAndScheduler =>
-            ResourceRequestDetail(jarAndScheduler._1, jarAndScheduler._2.executorFailed(executorId))
-          }
+          find(_._2.scheduledTasks(executorId).nonEmpty).map { jarAndScheduler =>
+          ResourceRequestDetail(jarAndScheduler._1, jarAndScheduler._2.executorFailed(executorId))
+        }
         LOG.info(s"ExecutorFailed " + result.mkString(";"))
         sender ! result
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/StreamAppMasterSummary.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
index cdcd3f6..3b2c8bf 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,15 +18,16 @@
 
 package io.gearpump.streaming.appmaster
 
-import io.gearpump.streaming.{ExecutorId, ProcessorId, LifeTime}
 import io.gearpump._
 import io.gearpump.cluster.AppMasterToMaster.AppMasterSummary
-import io.gearpump.cluster.{UserConfig, MasterToAppMaster}
 import io.gearpump.cluster.MasterToAppMaster.AppMasterStatus
-import AppMaster.ExecutorBrief
+import io.gearpump.cluster.{MasterToAppMaster, UserConfig}
+import io.gearpump.streaming.appmaster.AppMaster.ExecutorBrief
+import io.gearpump.streaming.{ExecutorId, LifeTime, ProcessorId}
 import io.gearpump.util.Graph
 import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
 
+/** Stream application summary, used for REST API */
 case class StreamAppMasterSummary(
     appType: String = "streaming",
     appId: Int,
@@ -42,7 +43,7 @@ case class StreamAppMasterSummary(
     dag: Graph[ProcessorId, String] = null,
     executors: List[ExecutorBrief] = null,
     processors: Map[ProcessorId, ProcessorSummary] = Map.empty[ProcessorId, ProcessorSummary],
-    // hiearachy level for each processor
+    // Hiearachy level for each processor
     processorLevels: Map[ProcessorId, Int] = Map.empty[ProcessorId, Int],
     historyMetricsConfig: HistoryMetricsConfig = null)
   extends AppMasterSummary
@@ -50,11 +51,11 @@ case class StreamAppMasterSummary(
 case class TaskCount(count: Int)
 
 case class ProcessorSummary(
-  id: ProcessorId,
-  taskClass: String,
-  parallelism : Int,
-  description: String,
-  taskConf: UserConfig,
-  life: LifeTime,
-  executors: List[ExecutorId],
-  taskCount: Map[ExecutorId, TaskCount])
\ No newline at end of file
+    id: ProcessorId,
+    taskClass: String,
+    parallelism: Int,
+    description: String,
+    taskConf: UserConfig,
+    life: LifeTime,
+    executors: List[ExecutorId],
+    taskCount: Map[ExecutorId, TaskCount])
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskLocator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskLocator.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskLocator.scala
index c456a06..6e7ebc6 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskLocator.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskLocator.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,35 +17,38 @@
  */
 package io.gearpump.streaming.appmaster
 
-import com.typesafe.config.{ConfigValueFactory, ConfigFactory, ConfigRenderOptions, Config}
-import TaskLocator.{Localities, WorkerLocality, NonLocality, Locality}
-import io.gearpump.WorkerId
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions, ConfigValueFactory}
+
+import io.gearpump.cluster.worker.WorkerId
 import io.gearpump.streaming.Constants
+import io.gearpump.streaming.appmaster.TaskLocator.{Localities, Locality, NonLocality, WorkerLocality}
 import io.gearpump.streaming.task.TaskId
-import scala.util.Try
-import scala.collection.JavaConverters._
 
 /**
  * TaskLocator is used to decide which machine one task should run on.
  *
- * User can specify config [[Constants.GEARPUMP_STREAMING_LOCALITIES]] to decide
- * to control which machine the task is running on.
+ * User can specify config [[io.gearpump.streaming.Constants#GEARPUMP_STREAMING_LOCALITIES]] to
+ * decide to control which machine the task is running on.
  */
 class TaskLocator(appName: String, config: Config) {
   private val taskLocalities: Map[TaskId, Locality] = loadTaskLocalities(config)
 
-  def locateTask(taskId: TaskId) : Locality = {
+  /** Finds where a task should belongs to */
+  def locateTask(taskId: TaskId): Locality = {
     taskLocalities.getOrElse(taskId, NonLocality)
   }
 
-  private def loadTaskLocalities(config: Config) : Map[TaskId, Locality] = {
-    import Constants.GEARPUMP_STREAMING_LOCALITIES
-    Try(config.getConfig(s"$GEARPUMP_STREAMING_LOCALITIES.$appName")).map {appConfig =>
+  private def loadTaskLocalities(config: Config): Map[TaskId, Locality] = {
+    import io.gearpump.streaming.Constants.GEARPUMP_STREAMING_LOCALITIES
+    Try(config.getConfig(s"$GEARPUMP_STREAMING_LOCALITIES.$appName")).map { appConfig =>
       val json = appConfig.root().render(ConfigRenderOptions.concise)
       Localities.fromJson(json)
     }.map { localityConfig =>
       import localityConfig.localities
-      localities.keySet.flatMap {workerId =>
+      localities.keySet.flatMap { workerId =>
         val tasks = localities(workerId)
         tasks.map((_, WorkerLocality(workerId)))
       }.toArray.toMap
@@ -57,15 +60,20 @@ object TaskLocator {
 
   trait Locality
 
+  /** Means we require the resource from the specific worker */
   case class WorkerLocality(workerId: WorkerId) extends Locality
 
+  /** Means no preference on worker */
   object NonLocality extends Locality
 
+  /** Localities settings. Mapping from workerId to list of taskId */
   case class Localities(localities: Map[WorkerId, Array[TaskId]])
 
   object Localities {
     val pattern = "task_([0-9]+)_([0-9]+)".r
 
+    // To avoid polluting the classpath, we do the JSON translation ourself instead of
+    // introducing JSON library dependencies directly.
     def fromJson(json: String): Localities = {
       val localities = ConfigFactory.parseString(json).getAnyRef("localities")
         .asInstanceOf[java.util.Map[String, String]].asScala.map { pair =>
@@ -80,8 +88,9 @@ object TaskLocator {
     }
 
     def toJson(localities: Localities): String = {
-      val map = localities.localities.toList.map {pair =>
-        (WorkerId.render(pair._1), pair._2.map(task => s"task_${task.processorId}_${task.index}").mkString(","))
+      val map = localities.localities.toList.map { pair =>
+        (WorkerId.render(pair._1), pair._2.map(task =>
+          s"task_${task.processorId}_${task.index}").mkString(","))
       }.toMap.asJava
       ConfigFactory.empty().withValue("localities", ConfigValueFactory.fromAnyRef(map)).
         root.render(ConfigRenderOptions.concise())

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskManager.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskManager.scala
index e1f4872..51f2f9c 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskManager.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskManager.scala
@@ -18,12 +18,17 @@
 
 package io.gearpump.streaming.appmaster
 
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
 import akka.actor._
 import akka.pattern.ask
+import org.slf4j.Logger
+
 import io.gearpump.TimeStamp
 import io.gearpump.cluster.MasterToAppMaster.ReplayFromTimestampWindowTrailingEdge
 import io.gearpump.streaming.AppMasterToExecutor._
-import io.gearpump.streaming.ExecutorToAppMaster.{UnRegisterTask, MessageLoss, RegisterTask}
+import io.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, RegisterTask, UnRegisterTask}
 import io.gearpump.streaming._
 import io.gearpump.streaming.appmaster.AppMaster.{AllocateResourceTimeOut, LookupTaskActorRef, TaskActorRef}
 import io.gearpump.streaming.appmaster.ClockService.{ChangeToNewDAG, ChangeToNewDAGSuccess}
@@ -36,31 +41,26 @@ import io.gearpump.streaming.executor.ExecutorRestartPolicy
 import io.gearpump.streaming.task._
 import io.gearpump.streaming.util.ActorPathUtil
 import io.gearpump.util.{Constants, LogUtil}
-import org.slf4j.Logger
-
-import scala.concurrent.Future
-import scala.concurrent.duration._
 
 /**
  *
  * TaskManager track all tasks's status.
  *
  * It is state machine with three states:
- * 1. applicationReady
- * 2. recovery
- * 3. dynamicDag
+ *  1. applicationReady
+ *  2. recovery
+ *  3. dynamicDag
  *
  * When in state applicationReady:
- * 1. When there is message-loss or JVM crash, transit to state recovery.
- * 2. When user modify the DAG, transit to dynamicDag.
+ *  1. When there is message-loss or JVM crash, transit to state recovery.
+ *  2. When user modify the DAG, transit to dynamicDag.
  *
  * When in state recovery:
- * 1. When all tasks has been recovered, transit to applicationReady.
+ *  1. When all tasks has been recovered, transit to applicationReady.
  *
  * When in state dynamicDag:
- * 1. When dyanmic dag transition is complete, transit to applicationReady.
- * 2. When there is message loss or JVM crash, transit to state recovery.
- *
+ *  1. When dynamic dag transition is complete, transit to applicationReady.
+ *  2. When there is message loss or JVM crash, transit to state recovery.
  */
 private[appmaster] class TaskManager(
     appId: Int,
@@ -73,13 +73,16 @@ private[appmaster] class TaskManager(
   extends Actor {
 
   private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
-  val systemConfig = context.system.settings.config
+  private val systemConfig = context.system.settings.config
 
   private val ids = new SessionIdFactory()
 
-  private val executorRestartPolicy = new ExecutorRestartPolicy(maxNrOfRetries = 5, withinTimeRange = 20 seconds)
-  implicit val timeout = Constants.FUTURE_TIMEOUT
-  implicit val actorSystem = context.system
+  private val executorRestartPolicy = new ExecutorRestartPolicy(maxNrOfRetries = 5,
+    withinTimeRange = 20.seconds)
+
+  private implicit val timeout = Constants.FUTURE_TIMEOUT
+  private implicit val actorSystem = context.system
+
   import context.dispatcher
 
   dagManager ! WatchChange(watcher = self)
@@ -91,7 +94,6 @@ private[appmaster] class TaskManager(
 
   private var startClock: Future[TimeStamp] = getStartClock
 
-
   def receive: Receive = applicationReady(DagReadyState.empty)
 
   private def onClientQuery(taskRegistry: TaskRegistry): Receive = {
@@ -104,14 +106,14 @@ private[appmaster] class TaskManager(
       val requestor = sender
       executorId.map { executorId =>
         val taskPath = ActorPathUtil.taskActorPath(appMaster, executorId, taskId)
-        context.actorSelection(taskPath).resolveOne(3 seconds).map { taskActorRef =>
+        context.actorSelection(taskPath).resolveOne(3.seconds).map { taskActorRef =>
           requestor ! TaskActorRef(taskActorRef)
         }
       }
   }
 
   /**
-   * state applicationReady
+   * State applicationReady
    */
   def applicationReady(state: DagReadyState): Receive = {
     executorManager ! state.taskRegistry.usedResource
@@ -121,7 +123,7 @@ private[appmaster] class TaskManager(
 
     val recoverRegistry = new TaskRegistry(expectedTasks = state.dag.tasks,
       deadTasks = state.taskRegistry.deadTasks)
-    
+
     val recoverState = new StartDagState(state.dag, recoverRegistry)
 
     val onError: Receive = {
@@ -153,8 +155,8 @@ private[appmaster] class TaskManager(
 
           val dagDiff = migrate(state.dag, newDag)
           jarScheduler.setDag(newDag, startClock)
-          val resourceRequestsDetails = jarScheduler.getRequestDetails()
-          resourceRequestsDetails.map{ details =>
+          val resourceRequestsDetails = jarScheduler.getResourceRequestDetails()
+          resourceRequestsDetails.map { details =>
             details.foreach { detail =>
               if (detail.requests.length > 0 && detail.requests.exists(!_.resource.isEmpty)) {
                 executorManager ! StartExecutors(detail.requests, detail.jar)
@@ -168,7 +170,8 @@ private[appmaster] class TaskManager(
             executors.foreach { pair =>
               val (executorId, tasks) = pair
               modifiedTasks ++= tasks
-              dagManager ! GetTaskLaunchData(newDag.version, processorId, ChangeTasksOnExecutor(executorId, tasks))
+              dagManager ! GetTaskLaunchData(newDag.version, processorId,
+                ChangeTasksOnExecutor(executorId, tasks))
             }
           }
 
@@ -200,12 +203,13 @@ private[appmaster] class TaskManager(
         context.become(applicationReady(newState))
     }
 
-    // recover to same version
-    onClientQuery(state.taskRegistry) orElse onError orElse onNewDag orElse onUnRegisterTask orElse unHandled("applicationReady")
+    // Recovers to same version
+    onClientQuery(state.taskRegistry) orElse onError orElse onNewDag orElse
+      onUnRegisterTask orElse unHandled("applicationReady")
   }
 
   /**
-   * state dynamicDag
+   * State dynamicDag
    */
   def dynamicDag(state: StartDagState, recoverState: StartDagState): Receive = {
     LOG.info(s"DynamicDag transit to dag version: ${state.dag.version}...")
@@ -231,11 +235,13 @@ private[appmaster] class TaskManager(
     case executor: ExecutorStarted =>
       import executor.{boundedJar, executorId, resource, workerId}
       val taskIdsFuture = jarScheduler.scheduleTask(boundedJar.get, workerId, executorId, resource)
-      taskIdsFuture.foreach {taskIds =>
-        LOG.info(s"Executor $executor has been started, start to schedule tasks: ${taskIds.mkString(",")}")
+      taskIdsFuture.foreach { taskIds =>
+        LOG.info(s"Executor $executor has been started, " +
+          s"start to schedule tasks: ${taskIds.mkString(",")}")
         taskIds.groupBy(_.processorId).foreach { pair =>
           val (processorId, tasks) = pair
-          dagManager ! GetTaskLaunchData(state.dag.version, processorId, StartTasksOnExecutor(executor.executorId, tasks))
+          dagManager ! GetTaskLaunchData(state.dag.version, processorId,
+            StartTasksOnExecutor(executor.executorId, tasks))
         }
       }
 
@@ -250,7 +256,8 @@ private[appmaster] class TaskManager(
           tasks.foreach(executorRestartPolicy.addTaskToExecutor(executorId, _))
         case ChangeTasksOnExecutor(executorId, tasks) =>
           LOG.info("change Task on executor: " + executorId + ", tasks: " + tasks)
-          val changeTasks = ChangeTasks(tasks, state.dag.version, processorDescription.life, subscribers)
+          val changeTasks = ChangeTasks(tasks, state.dag.version, processorDescription.life,
+            subscribers)
           executorManager ! UniCast(executorId, changeTasks)
         case other =>
           LOG.error(s"severe error! we expect ExecutorStarted but get ${other.getClass.toString}")
@@ -258,7 +265,7 @@ private[appmaster] class TaskManager(
     case TasksLaunched =>
     // We will track all launched task by message RegisterTask
     case TasksChanged(tasks) =>
-      tasks.foreach(task =>state.taskChangeRegistry.taskChanged(task))
+      tasks.foreach(task => state.taskChangeRegistry.taskChanged(task))
 
       if (allTasksReady(state)) {
         broadcastLocations(state)
@@ -306,10 +313,11 @@ private[appmaster] class TaskManager(
 
   def onExecutorError: Receive = {
     case ExecutorStopped(executorId) =>
-      if(executorRestartPolicy.allowRestartExecutor(executorId)) {
+      if (executorRestartPolicy.allowRestartExecutor(executorId)) {
         jarScheduler.executorFailed(executorId).foreach { resourceRequestDetail =>
           if (resourceRequestDetail.isDefined) {
-            executorManager ! StartExecutors(resourceRequestDetail.get.requests, resourceRequestDetail.get.jar)
+            executorManager ! StartExecutors(resourceRequestDetail.get.requests,
+              resourceRequestDetail.get.jar)
           }
         }
       } else {
@@ -330,7 +338,7 @@ private[appmaster] class TaskManager(
   }
 
   /**
-   *  state recovery
+   * State recovery
    */
   def recovery(state: StartDagState): Receive = {
     val recoverDagVersion = state.dag.version
@@ -344,7 +352,7 @@ private[appmaster] class TaskManager(
     LOG.info(s"goto state Recovery(recoverDag = $recoverDagVersion)...")
     val ignoreClock: Receive = {
       case clock: ClockEvent =>
-      //ignore clock events.
+      // Ignores clock events.
     }
 
     if (state.dag.isEmpty) {
@@ -354,7 +362,8 @@ private[appmaster] class TaskManager(
         deadTasks = state.taskRegistry.deadTasks)
 
       val recoverState = new StartDagState(state.dag, registry)
-      ignoreClock orElse startDag(state, recoverState) orElse onExecutorError orElse unHandled("recovery")
+      ignoreClock orElse startDag(state, recoverState) orElse onExecutorError orElse
+        unHandled("recovery")
     }
   }
 
@@ -364,19 +373,17 @@ private[appmaster] class TaskManager(
   }
 }
 
-private [appmaster] object TaskManager {
+private[appmaster] object TaskManager {
 
   /**
    * When application is ready, then transit to DagReadyState
    */
-  class DagReadyState(
-    val dag: DAG,
-    val taskRegistry: TaskRegistry)
+  class DagReadyState(val dag: DAG, val taskRegistry: TaskRegistry)
 
   object DagReadyState {
     def empty: DagReadyState = {
       new DagReadyState(
-        DAG.empty().copy(version = -1),
+        DAG.empty.copy(version = -1),
         new TaskRegistry(List.empty[TaskId]))
     }
   }
@@ -385,10 +392,10 @@ private [appmaster] object TaskManager {
    * When application is booting up or doing recovery, it use StartDagState
    */
   class StartDagState(
-    val dag: DAG,
-    val taskRegistry: TaskRegistry,
-    val taskChangeRegistry: TaskChangeRegistry = new TaskChangeRegistry(List.empty[TaskId]),
-    val executorReadyRegistry: ExecutorRegistry = new ExecutorRegistry)
+      val dag: DAG,
+      val taskRegistry: TaskRegistry,
+      val taskChangeRegistry: TaskChangeRegistry = new TaskChangeRegistry(List.empty[TaskId]),
+      val executorReadyRegistry: ExecutorRegistry = new ExecutorRegistry)
 
   case object GetTaskList
 
@@ -397,18 +404,17 @@ private [appmaster] object TaskManager {
   case class FailedToRecover(errorMsg: String)
 
   /**
-   * Start new Tasks on Executor <executorId>
+   * Starts new Tasks on Executor executorId
    */
   case class StartTasksOnExecutor(executorId: Int, tasks: List[TaskId])
 
   /**
-   * Change existing tasks on executor <executorId>
+   * Changes existing tasks on executor executorId
    */
   case class ChangeTasksOnExecutor(executorId: Int, tasks: List[TaskId])
 
-
   /**
-   * Track the registration of all new started executors.
+   * Tracks the registration of all new started executors.
    */
   class ExecutorRegistry {
     private var registeredExecutors = Set.empty[ExecutorId]
@@ -423,7 +429,7 @@ private [appmaster] object TaskManager {
   }
 
   /**
-   * Track the registration of all changed tasks.
+   * Tracks the registration of all changed tasks.
    */
   class TaskChangeRegistry(targetTasks: List[TaskId]) {
     private var registeredTasks = Set.empty[TaskId]
@@ -443,12 +449,12 @@ private [appmaster] object TaskManager {
    * DAGDiff is used to track impacted processors when doing dynamic dag.
    */
   case class DAGDiff(
-    addedProcessors: List[ProcessorId],
-    modifiedProcessors: List[ProcessorId],
-    impactedUpstream: List[ProcessorId])
+      addedProcessors: List[ProcessorId],
+      modifiedProcessors: List[ProcessorId],
+      impactedUpstream: List[ProcessorId])
 
   /**
-   * Migrate from old DAG to new DAG, return DAGDiff
+   * Migrates from old DAG to new DAG, return DAGDiff
    */
   def migrate(leftDAG: DAG, rightDAG: DAG): DAGDiff = {
     val left = leftDAG.processors.keySet
@@ -457,19 +463,19 @@ private [appmaster] object TaskManager {
     val added = right -- left
     val join = right -- added
 
-    val modified = join.filter {processorId =>
+    val modified = join.filter { processorId =>
       leftDAG.processors(processorId) != rightDAG.processors(processorId)
     }
 
     val upstream = (list: Set[ProcessorId]) => {
-      list.flatMap {processorId =>
+      list.flatMap { processorId =>
         rightDAG.graph.incomingEdgesOf(processorId).map(_._1).toSet
       } -- list
     }
 
     val impactedUpstream = upstream(added ++ modified)
 
-    // all upstream will be affected.
+    // All upstream tasks are affected, and should be handled properly.
     DAGDiff(added.toList, modified.toList, impactedUpstream.toList)
   }
 
@@ -480,7 +486,7 @@ private [appmaster] object TaskManager {
     private var nextSessionId = 1
 
     /**
-     * return a new session Id for new task
+     * Returns a new session Id for new task
      */
     final def newSessionId: Int = {
       val sessionId = nextSessionId

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskRegistry.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskRegistry.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskRegistry.scala
index 79371ea..adfdeba 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskRegistry.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskRegistry.scala
@@ -18,18 +18,18 @@
 
 package io.gearpump.streaming.appmaster
 
-import io.gearpump.streaming.{ExecutorId, ProcessorId}
-import io.gearpump.streaming.task.TaskId
+import org.slf4j.Logger
+
 import io.gearpump.cluster.scheduler.Resource
-import ExecutorManager.ExecutorResourceUsageSummary
-import TaskRegistry._
+import io.gearpump.streaming.appmaster.ExecutorManager.ExecutorResourceUsageSummary
+import io.gearpump.streaming.appmaster.TaskRegistry._
+import io.gearpump.streaming.task.TaskId
+import io.gearpump.streaming.{ExecutorId, ProcessorId}
 import io.gearpump.transport.HostPort
 import io.gearpump.util.LogUtil
-import org.slf4j.Logger
 
 /**
- * TaskRegistry is used to track the registration of all tasks
- * when one application is booting up.
+ * Tracks the registration of all tasks, when the application is booting up.
  */
 class TaskRegistry(val expectedTasks: List[TaskId],
     var registeredTasks: Map[TaskId, TaskLocation] = Map.empty[TaskId, TaskLocation],
@@ -43,6 +43,10 @@ class TaskRegistry(val expectedTasks: List[TaskId],
    * When a task is booted, it need to call registerTask to register itself.
    * If this task is valid, then accept it, otherwise reject it.
    *
+   * @param taskId Task that register itself to TaskRegistry.
+   * @param location The host and port where this task is running on. NOTE: The host and port
+   *                 is NOT the same host and port of Akka remoting. Instead, it is host and port
+   *                 of custom netty layer, see [[io.gearpump.transport.netty.Context]].
    */
   def registerTask(taskId: TaskId, location: TaskLocation): RegisterTaskStatus = {
     val processorId = taskId.processorId
@@ -57,13 +61,13 @@ class TaskRegistry(val expectedTasks: List[TaskId],
   }
 
   def copy(expectedTasks: List[TaskId] = this.expectedTasks,
-    registeredTasks: Map[TaskId, TaskLocation] = this.registeredTasks,
-    deadTasks: Set[TaskId] = this.deadTasks): TaskRegistry = {
+      registeredTasks: Map[TaskId, TaskLocation] = this.registeredTasks,
+      deadTasks: Set[TaskId] = this.deadTasks): TaskRegistry = {
     new TaskRegistry(expectedTasks, registeredTasks, deadTasks)
   }
 
   def getTaskLocations: TaskLocations = {
-    val taskLocations =  registeredTasks.toList.groupBy(_._2.host).map{ pair =>
+    val taskLocations = registeredTasks.toList.groupBy(_._2.host).map { pair =>
       val (k, v) = pair
       val taskIds = v.map(_._1)
       (k, taskIds.toSet)
@@ -74,16 +78,18 @@ class TaskRegistry(val expectedTasks: List[TaskId],
   def getTaskExecutorMap: Map[TaskId, ExecutorId] = {
     getTaskLocations.locations.flatMap { pair =>
       val (hostPort, taskSet) = pair
-      taskSet.map{ taskId =>
+      taskSet.map { taskId =>
         (taskId, getExecutorId(taskId).getOrElse(-1))
       }
     }
   }
 
+  /** Query the executor Id where the task is running on */
   def getExecutorId(taskId: TaskId): Option[Int] = {
     registeredTasks.get(taskId).map(_.executorId)
   }
 
+  /** Gets list of allocated executor Ids */
   def executors: List[ExecutorId] = {
     registeredTasks.toList.map(_._2.executorId)
   }
@@ -101,6 +107,7 @@ class TaskRegistry(val expectedTasks: List[TaskId],
     registeredTasks.keys.toList.filter(_.processorId == processorId)
   }
 
+  /** List of executors that current processor taks are running on */
   def processorExecutors(processorId: ProcessorId): Map[ExecutorId, List[TaskId]] = {
     val taskToExecutor = filterTasks(processorId).flatMap { taskId =>
       getExecutorId(taskId).map { executorId =>
@@ -108,15 +115,16 @@ class TaskRegistry(val expectedTasks: List[TaskId],
       }
     }
 
-    val executorToTasks = taskToExecutor.groupBy(_._2).map{kv =>
+    val executorToTasks = taskToExecutor.groupBy(_._2).map { kv =>
       val (k, v) = kv
       (k, v.map(_._1))
     }
     executorToTasks
   }
 
+  /** Summary about how many resources are used for all running tasks */
   def usedResource: ExecutorResourceUsageSummary = {
-    val resourceMap = registeredTasks.foldLeft(Map.empty[ExecutorId, Resource]) {(map, task) =>
+    val resourceMap = registeredTasks.foldLeft(Map.empty[ExecutorId, Resource]) { (map, task) =>
       val resource = map.getOrElse(task._2.executorId, Resource(0)) + Resource(1)
       map + (task._2.executorId -> resource)
     }
@@ -131,5 +139,5 @@ object TaskRegistry {
 
   case class TaskLocation(executorId: Int, host: HostPort)
 
-  case class TaskLocations(locations : Map[HostPort, Set[TaskId]])
+  case class TaskLocations(locations: Map[HostPort, Set[TaskId]])
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala
index 39f427a..62dff6c 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,17 +18,19 @@
 package io.gearpump.streaming.appmaster
 
 import com.typesafe.config.Config
-import io.gearpump.{WorkerId, TimeStamp}
+
 import io.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceRequest}
+import io.gearpump.cluster.worker.WorkerId
 import io.gearpump.streaming.DAG
 import io.gearpump.streaming.appmaster.TaskLocator.{Locality, WorkerLocality}
 import io.gearpump.streaming.appmaster.TaskScheduler.{Location, TaskStatus}
 import io.gearpump.streaming.task.TaskId
-import io.gearpump.util.{Constants, LogUtil}
-import org.slf4j.Logger
+import io.gearpump.util.Constants
 
 /**
- * This schedules tasks to run for new allocated resources.
+ * Schedules tasks to run for new allocated resources. TaskScheduler only schedule tasks that
+ * share the same jar. For scheduling for multiple jars, see
+ * [[io.gearpump.streaming.appmaster.JarScheduler]].
  */
 trait TaskScheduler {
 
@@ -44,8 +46,8 @@ trait TaskScheduler {
   def getResourceRequests(): Array[ResourceRequest]
 
   /**
-   * This notify the scheduler that a resource slot on {workerId} and {executorId} is allocated, and
-   * expect a task to be scheduled in return.
+   * This notifies the scheduler that a resource slot on {workerId} and {executorId} is allocated
+   * , and expect a task to be scheduled in return.
    * Task locality should be considered when deciding whether to offer a task on target {worker}
    * and {executor}
    *
@@ -53,24 +55,24 @@ trait TaskScheduler {
    * @param executorId which executorId this resource belongs to.
    * @return a list of tasks
    */
-  def schedule(workerId : WorkerId, executorId: Int, resource: Resource) : List[TaskId]
+  def schedule(workerId: WorkerId, executorId: Int, resource: Resource): List[TaskId]
 
   /**
-   * This notify the scheduler that {executorId} is failed, and expect a set of
+   * This notifies the scheduler that {executorId} is failed, and expect a set of
    * ResourceRequest for all failed tasks on that executor.
    *
    * @param executorId executor that failed
    * @return resource requests of the failed executor
    */
-  def executorFailed(executorId: Int) : Array[ResourceRequest]
+  def executorFailed(executorId: Int): Array[ResourceRequest]
 
   /**
-    * Query the task list that already scheduled on the executor
-    *
-    * @param executorId executor to query
-    * @return a list of tasks
-    */
-  def scheduledTasks(executorId: Int) : List[TaskId]
+   * Queries the task list that already scheduled on the executor
+   *
+   * @param executorId executor to query
+   * @return a list of tasks
+   */
+  def scheduledTasks(executorId: Int): List[TaskId]
 }
 
 object TaskScheduler {
@@ -79,12 +81,12 @@ object TaskScheduler {
   class TaskStatus(val taskId: TaskId, val preferLocality: Locality, var allocation: Location)
 }
 
-class TaskSchedulerImpl(appId : Int, appName: String, config: Config)  extends TaskScheduler {
+class TaskSchedulerImpl(appId: Int, appName: String, config: Config) extends TaskScheduler {
   private val executorNum = config.getInt(Constants.APPLICATION_EXECUTOR_NUMBER)
 
   private var tasks = List.empty[TaskStatus]
 
-  // find the locality of the tasks
+  // Finds the locality of the tasks
   private val taskLocator = new TaskLocator(appName, config)
 
   override def setDAG(dag: DAG): Unit = {
@@ -96,15 +98,15 @@ class TaskSchedulerImpl(appId : Int, appName: String, config: Config)  extends T
     }
   }
 
-  def getResourceRequests(): Array[ResourceRequest] ={
+  def getResourceRequests(): Array[ResourceRequest] = {
     fetchResourceRequests(fromOneWorker = false)
   }
 
-  import Relaxation._
-  private def fetchResourceRequests(fromOneWorker: Boolean = false): Array[ResourceRequest] ={
+  import io.gearpump.cluster.scheduler.Relaxation._
+  private def fetchResourceRequests(fromOneWorker: Boolean = false): Array[ResourceRequest] = {
     var workersResourceRequest = Map.empty[WorkerId, Resource]
 
-    tasks.filter(_.allocation == null).foreach{task =>
+    tasks.filter(_.allocation == null).foreach { task =>
       task.preferLocality match {
         case WorkerLocality(workerId) =>
           val current = workersResourceRequest.getOrElse(workerId, Resource.empty)
@@ -116,7 +118,7 @@ class TaskSchedulerImpl(appId : Int, appName: String, config: Config)  extends T
       }
     }
 
-    workersResourceRequest.map {workerIdAndResource =>
+    workersResourceRequest.map { workerIdAndResource =>
       val (workerId, resource) = workerIdAndResource
       if (workerId == WorkerId.unspecified) {
         ResourceRequest(resource, workerId = WorkerId.unspecified, executorNum = executorNum)
@@ -126,23 +128,26 @@ class TaskSchedulerImpl(appId : Int, appName: String, config: Config)  extends T
     }.toArray
   }
 
-  override def schedule(workerId : WorkerId, executorId: Int, resource: Resource) : List[TaskId] = {
+  override def schedule(workerId: WorkerId, executorId: Int, resource: Resource): List[TaskId] = {
     var scheduledTasks = List.empty[TaskId]
     val location = Location(workerId, executorId)
-    // schedule tasks for specific worker
+    // Schedules tasks for specific worker
     scheduledTasks ++= scheduleTasksForLocality(resource, location,
       (locality) => locality == WorkerLocality(workerId))
 
-    // schedule tasks without specific location preference
-    scheduledTasks ++= scheduleTasksForLocality(resource - Resource(scheduledTasks.length), location, (locality) => true)
+    // Schedules tasks without specific location preference
+    scheduledTasks ++= scheduleTasksForLocality(resource - Resource(scheduledTasks.length),
+      location, (locality) => true)
     scheduledTasks
   }
 
-  private def scheduleTasksForLocality(resource: Resource, resourceLocation: Location, matcher: (Locality) => Boolean): List[TaskId] = {
+  private def scheduleTasksForLocality(
+      resource: Resource, resourceLocation: Location, matcher: (Locality) => Boolean)
+    : List[TaskId] = {
     var scheduledTasks = List.empty[TaskId]
     var index = 0
     var remain = resource.slots
-    while(index < tasks.length && remain > 0) {
+    while (index < tasks.length && remain > 0) {
       val taskStatus = tasks(index)
       if (taskStatus.allocation == null && matcher(taskStatus.preferLocality)) {
         taskStatus.allocation = resourceLocation
@@ -154,18 +159,19 @@ class TaskSchedulerImpl(appId : Int, appName: String, config: Config)  extends T
     scheduledTasks
   }
 
-  override def executorFailed(executorId: Int) : Array[ResourceRequest] = {
+  override def executorFailed(executorId: Int): Array[ResourceRequest] = {
     val failedTasks = tasks.filter { status =>
       status.allocation != null && status.allocation.executorId == executorId
     }
-    // clean the location of failed tasks
+    // Cleans the location of failed tasks
     failedTasks.foreach(_.allocation = null)
 
-    Array(ResourceRequest(Resource(failedTasks.length), workerId = WorkerId.unspecified, relaxation = ONEWORKER))
+    Array(ResourceRequest(Resource(failedTasks.length),
+      workerId = WorkerId.unspecified, relaxation = ONEWORKER))
   }
 
   override def scheduledTasks(executorId: Int): List[TaskId] = {
-    tasks.filter{ status =>
+    tasks.filter { status =>
       status.allocation != null && status.allocation.executorId == executorId
     }.map(_.taskId)
   }