You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/04/26 11:42:57 UTC

[47/49] incubator-gearpump git commit: GEARPUMP-11, fix code style

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala b/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala
index 8847df2..0996381 100644
--- a/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala
+++ b/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,17 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package io.gearpump.cluster.master
 
 import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
 
+/** Master status. Synced means all masters are live and synced. */
 object MasterStatus {
   type Type = String
   val Synced = "synced"
   val UnSynced = "unsynced"
 }
 
-
 case class MasterNode(host: String, port: Int) {
   def toTuple: (String, Int) = {
     (host, port)
@@ -33,18 +34,18 @@ case class MasterNode(host: String, port: Int) {
 }
 
 /**
- * Master information for REST API call
+ * Master information returned for REST API call
  */
 case class MasterSummary(
-  leader: MasterNode,
-  cluster: List[MasterNode],
-  aliveFor: Long,
-  logFile: String,
-  jarStore: String,
-  masterStatus: MasterStatus.Type,
-  homeDirectory: String,
-  activities: List[MasterActivity],
-  jvmName: String,
-  historyMetricsConfig: HistoryMetricsConfig = null)
+    leader: MasterNode,
+    cluster: List[MasterNode],
+    aliveFor: Long,
+    logFile: String,
+    jarStore: String,
+    masterStatus: MasterStatus.Type,
+    homeDirectory: String,
+    activities: List[MasterActivity],
+    jvmName: String,
+    historyMetricsConfig: HistoryMetricsConfig = null)
 
 case class MasterActivity(time: Long, event: String)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala b/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala
index da17829..b25162e 100644
--- a/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala
+++ b/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,50 +15,66 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package io.gearpump.cluster.scheduler
 
 import akka.actor.ActorRef
-import io.gearpump.WorkerId
 
-case class Resource(slots : Int) {
-  def +(other : Resource): Resource = Resource(slots + other.slots)
+import io.gearpump.cluster.worker.WorkerId
+
+case class Resource(slots: Int) {
 
-  def -(other : Resource): Resource = Resource(slots - other.slots)
+  // scalastyle:off spaces.after.plus
+  def +(other: Resource): Resource = Resource(slots + other.slots)
+  // scalastyle:on spaces.after.plus
 
-  def >(other : Resource): Boolean = slots > other.slots
+  def -(other: Resource): Resource = Resource(slots - other.slots)
 
-  def >=(other : Resource): Boolean = !(this < other)
+  def >(other: Resource): Boolean = slots > other.slots
 
-  def <(other : Resource): Boolean = slots < other.slots
+  def >=(other: Resource): Boolean = !(this < other)
 
-  def <=(other : Resource): Boolean = !(this > other)
+  def <(other: Resource): Boolean = slots < other.slots
 
-  def equals(other : Resource): Boolean = slots == other.slots
+  def <=(other: Resource): Boolean = !(this > other)
 
-  def isEmpty: Boolean = slots == 0
+  def isEmpty: Boolean = {
+    slots == 0
+  }
 }
 
-object Priority extends Enumeration{
+/**
+ * Each streaming job can have a priority, the job with higher priority
+ * will get scheduled resource earlier than those with lower priority.
+ */
+object Priority extends Enumeration {
   type Priority = Value
   val LOW, NORMAL, HIGH = Value
 }
 
-object Relaxation extends Enumeration{
+/**
+ * Relaxation.ONEWORKER means only resource (slot) from that worker will be accepted by
+ * the requestor application job.
+ */
+object Relaxation extends Enumeration {
   type Relaxation = Value
 
   // Option ONEWORKER allow user to schedule a task on specific worker.
   val ANY, ONEWORKER, SPECIFICWORKER = Value
 }
 
-import Relaxation._
-import Priority._
-case class ResourceRequest(resource: Resource,  workerId: WorkerId, priority: Priority = NORMAL, relaxation: Relaxation = ANY, executorNum: Int = 1)
+import io.gearpump.cluster.scheduler.Priority._
+import io.gearpump.cluster.scheduler.Relaxation._
+
+case class ResourceRequest(
+    resource: Resource, workerId: WorkerId, priority: Priority = NORMAL,
+    relaxation: Relaxation = ANY, executorNum: Int = 1)
 
-case class ResourceAllocation(resource : Resource, worker : ActorRef, workerId : WorkerId)
+case class ResourceAllocation(resource: Resource, worker: ActorRef, workerId: WorkerId)
 
 object Resource {
-  def empty = new Resource(0)
+  def empty: Resource = new Resource(0)
 
-  def min(res1: Resource, res2: Resource) = if (res1.slots < res2.slots) res1 else res2
+  def min(res1: Resource, res2: Resource): Resource = if (res1.slots < res2.slots) res1 else res2
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/worker/ExecutorProcessLauncher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/worker/ExecutorProcessLauncher.scala b/core/src/main/scala/io/gearpump/cluster/worker/ExecutorProcessLauncher.scala
index a7e7dd2..8581467 100644
--- a/core/src/main/scala/io/gearpump/cluster/worker/ExecutorProcessLauncher.scala
+++ b/core/src/main/scala/io/gearpump/cluster/worker/ExecutorProcessLauncher.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,37 +15,42 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package io.gearpump.cluster.worker
 
 import com.typesafe.config.Config
-import io.gearpump.util.RichProcess
+
 import io.gearpump.cluster.scheduler.Resource
+import io.gearpump.util.RichProcess
 
 /**
-  * ExecutorProcessLauncher is used to launch a process for Executor using given parameters.
-  * User can implement this interface to decide the behavior of launching a process.
-  * Set "gearpump.worker.executor-process-launcher" to your implemented class name.
-  */
+ * ExecutorProcessLauncher is used to launch a process for Executor using given parameters.
+ *
+ * User can implement this interface to decide the behavior of launching a process.
+ * Set "gearpump.worker.executor-process-launcher" to your implemented class name.
+ */
 trait ExecutorProcessLauncher {
   val config: Config
 
   /**
-    * This function will launch a process for Executor using given parameters.
-    * @param appId The appId of the executor to be launched
-    * @param executorId The executorId of the executor to be launched
-    * @param resource The resource allocated for that executor
-    * @param options The command options
-    * @param classPath The classpath of the process
-    * @param mainClass The main class of the process
-    * @param arguments The rest arguments
-    */
-  def createProcess(appId: Int, executorId:Int, resource: Resource, config: Config, options : Array[String],
-    classPath : Array[String], mainClass : String, arguments : Array[String]): RichProcess
+   * This function launches a process for Executor using given parameters.
+   *
+   * @param appId The appId of the executor to be launched
+   * @param executorId The executorId of the executor to be launched
+   * @param resource The resource allocated for that executor
+   * @param options The command options
+   * @param classPath The classpath of the process
+   * @param mainClass The main class of the process
+   * @param arguments The rest arguments
+   */
+  def createProcess(
+      appId: Int, executorId: Int, resource: Resource, config: Config, options: Array[String],
+      classPath: Array[String], mainClass: String, arguments: Array[String]): RichProcess
 
   /**
-    * This function will clean resources for a launched process.
-    * @param appId The appId of the launched executor
-    * @param executorId The executorId of launched executor
-    */
+   * This function will clean resources for a launched process.
+   * @param appId The appId of the launched executor
+   * @param executorId The executorId of launched executor
+   */
   def cleanProcess(appId: Int, executorId: Int): Unit
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/worker/WorkerId.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/worker/WorkerId.scala b/core/src/main/scala/io/gearpump/cluster/worker/WorkerId.scala
new file mode 100644
index 0000000..24c6ad2
--- /dev/null
+++ b/core/src/main/scala/io/gearpump/cluster/worker/WorkerId.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.gearpump.cluster.worker
+
+/**
+ * WorkerId is used to uniquely track a worker machine.
+ *
+ * @param sessionId sessionId is assigned by Master node for easy tracking. It is possible that
+ *                  sessionId is **NOT** unique, so always use WorkerId for comparison.
+ * @param registerTime the timestamp when a worker node register itself to master node
+ */
+case class WorkerId(sessionId: Int, registerTime: Long)
+
+object WorkerId {
+  val unspecified: WorkerId = new WorkerId(-1, 0L)
+
+  def render(workerId: WorkerId): String = {
+    workerId.registerTime + "_" + workerId.sessionId
+  }
+
+  def parse(str: String): WorkerId = {
+    val pair = str.split("_")
+    new WorkerId(pair(1).toInt, pair(0).toLong)
+  }
+
+  implicit val workerIdOrdering: Ordering[WorkerId] = {
+    new Ordering[WorkerId] {
+
+      /** Compare timestamp first, then id */
+      override def compare(x: WorkerId, y: WorkerId): Int = {
+        if (x.registerTime < y.registerTime) {
+          -1
+        } else if (x.registerTime == y.registerTime) {
+          if (x.sessionId < y.sessionId) {
+            -1
+          } else if (x.sessionId == y.sessionId) {
+            0
+          } else {
+            1
+          }
+        } else {
+          1
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala b/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala
index ca700dc..cdf2d03 100644
--- a/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala
+++ b/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,32 +15,33 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package io.gearpump.cluster.worker
 
-import akka.actor.ActorRef
-import io.gearpump.WorkerId
+package io.gearpump.cluster.worker
 import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
 
 /**
  * Worker summary information for REST API.
  */
 case class WorkerSummary(
-  workerId: WorkerId,
-  state: String,
-  actorPath: String,
-  aliveFor: Long,
-  logFile: String,
-  executors: Array[ExecutorSlots],
-  totalSlots: Int,
-  availableSlots: Int,
-  homeDirectory: String,
-  jvmName: String,
-  // Id used to uniquely identity this worker process in low level resource manager like YARN.
-  resourceManagerContainerId: String,
-  historyMetricsConfig: HistoryMetricsConfig = null)
+    workerId: WorkerId,
+    state: String,
+    actorPath: String,
+    aliveFor: Long,
+    logFile: String,
+    executors: Array[ExecutorSlots],
+    totalSlots: Int,
+    availableSlots: Int,
+    homeDirectory: String,
+    jvmName: String,
+    // Id used to uniquely identity this worker process in low level resource manager like YARN.
+    resourceManagerContainerId: String,
+    historyMetricsConfig: HistoryMetricsConfig = null)
 
-object WorkerSummary{
-  def empty = WorkerSummary(WorkerId.unspecified, "", "", 0L, "", Array.empty[ExecutorSlots], 0, 0, "", jvmName = "", resourceManagerContainerId = "")
+object WorkerSummary {
+  def empty: WorkerSummary = {
+    WorkerSummary(WorkerId.unspecified, "", "", 0L, "",
+      Array.empty[ExecutorSlots], 0, 0, "", jvmName = "", resourceManagerContainerId = "")
+  }
 }
 
 case class ExecutorSlots(appId: Int, executorId: Int, slots: Int)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/jarstore/JarStoreService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/jarstore/JarStoreService.scala b/core/src/main/scala/io/gearpump/jarstore/JarStoreService.scala
index ba01c94..54d5431 100644
--- a/core/src/main/scala/io/gearpump/jarstore/JarStoreService.scala
+++ b/core/src/main/scala/io/gearpump/jarstore/JarStoreService.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,17 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package io.gearpump.jarstore
 
 import java.io.File
 import java.net.URI
 import java.util.ServiceLoader
+import scala.collection.JavaConverters._
 
-import akka.actor.{ActorSystem, ActorRefFactory}
+import akka.actor.ActorSystem
 import com.typesafe.config.Config
-import io.gearpump.util.{Constants, Util}
 
-import scala.collection.JavaConverters._
+import io.gearpump.util.{Constants, Util}
 
 case class FilePath(path: String)
 
@@ -39,7 +40,7 @@ trait JarStoreService {
    * Like "hdfs" for HDFS file system, and "file" for a local
    * file system.
    */
-  val scheme : String
+  val scheme: String
 
   /**
    * Init the Jar Store.
@@ -47,13 +48,14 @@ trait JarStoreService {
   def init(config: Config, system: ActorSystem)
 
   /**
-    * This function will copy the local file to the remote JarStore, called from client side.
+   * This function will copy the local file to the remote JarStore, called from client side.
    * @param localFile The local file
    */
   def copyFromLocal(localFile: File): FilePath
 
   /**
-    * This function will copy the remote file to local file system, called from client side.
+   * This function will copy the remote file to local file system, called from client side.
+   *
    * @param localFile The destination of file path
    * @param remotePath The remote file path from JarStore
    */
@@ -64,7 +66,8 @@ object JarStoreService {
 
   /**
    * Get a active JarStoreService by specifying a scheme.
-   * Please see config [[Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH]] for more
+   *
+   * Please see config [[io.gearpump.util.Constants#GEARPUMP_APP_JAR_STORE_ROOT_PATH]] for more
    * information.
    */
   def get(config: Config): JarStoreService = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/metrics/AkkaReporter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/metrics/AkkaReporter.scala b/core/src/main/scala/io/gearpump/metrics/AkkaReporter.scala
index 269f8f3..3a581fb 100644
--- a/core/src/main/scala/io/gearpump/metrics/AkkaReporter.scala
+++ b/core/src/main/scala/io/gearpump/metrics/AkkaReporter.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,15 +18,14 @@
 
 package io.gearpump.metrics
 
+import scala.collection.JavaConverters._
+
 import akka.actor.{ActorRef, ActorSystem}
-import io.gearpump.metrics.Metrics.{Histogram => HistogramData, Meter => MeterData, Counter => CounterData, Gauge => GaugeData}
-import io.gearpump.codahale.metrics.MetricRegistry
 
-import akka.actor.ActorSystem
-import io.gearpump.codahale.metrics.{Gauge => CodaGauge}
+import io.gearpump.codahale.metrics.{Gauge => CodaGauge, MetricRegistry}
+import io.gearpump.metrics.Metrics.{Counter => CounterData, Gauge => GaugeData, Histogram => HistogramData, Meter => MeterData}
 import io.gearpump.metrics.MetricsReporterService.ReportTo
 import io.gearpump.util.LogUtil
-import scala.collection.JavaConverters._
 
 /**
  * A reporter class for logging metrics values to a remote actor periodically
@@ -34,7 +33,7 @@ import scala.collection.JavaConverters._
 class AkkaReporter(
     system: ActorSystem,
     registry: MetricRegistry)
-  extends ReportTo{
+  extends ReportTo {
   private val LOG = LogUtil.getLogger(getClass)
   LOG.info("Start Metrics AkkaReporter")
 
@@ -57,7 +56,7 @@ class AkkaReporter(
         s.get95thPercentile, s.get99thPercentile, s.get999thPercentile)
     }
 
-    meters.entrySet().asScala.foreach{pair =>
+    meters.entrySet().asScala.foreach { pair =>
       val key = pair.getKey
       val value = pair.getValue
       to ! MeterData(key,
@@ -67,7 +66,7 @@ class AkkaReporter(
         getRateUnit)
     }
 
-    gauges.entrySet().asScala.foreach {kv =>
+    gauges.entrySet().asScala.foreach { kv =>
       val value = kv.getValue.asInstanceOf[CodaGauge[Number]].getValue.longValue()
       to ! GaugeData(kv.getKey, value)
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/metrics/Counter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/metrics/Counter.scala b/core/src/main/scala/io/gearpump/metrics/Counter.scala
index e0c9c57..70c7bae 100644
--- a/core/src/main/scala/io/gearpump/metrics/Counter.scala
+++ b/core/src/main/scala/io/gearpump/metrics/Counter.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,9 +21,9 @@ package io.gearpump.metrics
 import io.gearpump.codahale.metrics.{Counter => CodaHaleCounter}
 
 /**
- * sampleRate: take a data point for every sampleRate...
+ * @see io.gearpump.codahale.metrics.Counter
  */
-class Counter(val name : String, counter : CodaHaleCounter, sampleRate : Int = 1) {
+class Counter(val name: String, counter: CodaHaleCounter, sampleRate: Int = 1) {
   private var sampleCount = 0L
   private var toBeIncremented = 0L
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/metrics/Histogram.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/metrics/Histogram.scala b/core/src/main/scala/io/gearpump/metrics/Histogram.scala
index 2ad06c3..4673050 100644
--- a/core/src/main/scala/io/gearpump/metrics/Histogram.scala
+++ b/core/src/main/scala/io/gearpump/metrics/Histogram.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,23 +21,23 @@ package io.gearpump.metrics
 import io.gearpump.codahale.metrics.{Histogram => CodaHaleHistogram}
 
 /**
- * sampleRate: take a data point for every sampleRate...
+ * @see io.gearpump.codahale.metrics.Histogram
  */
-class Histogram(val name : String, hisgram : CodaHaleHistogram, sampleRate : Int = 1) {
+class Histogram(val name: String, histogram: CodaHaleHistogram, sampleRate: Int = 1) {
   private var sampleCount = 0L
 
   def update(value: Long) {
     sampleCount += 1
-    if (null != hisgram && sampleCount % sampleRate == 0) {
-      hisgram.update(value)
+    if (null != histogram && sampleCount % sampleRate == 0) {
+      histogram.update(value)
     }
   }
 
-  def getMean() : Double = {
-    hisgram.getSnapshot.getMean
+  def getMean(): Double = {
+    histogram.getSnapshot.getMean
   }
 
-  def getStdDev() : Double = {
-    hisgram.getSnapshot.getStdDev
+  def getStdDev(): Double = {
+    histogram.getSnapshot.getStdDev
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/metrics/JvmMetricsSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/metrics/JvmMetricsSet.scala b/core/src/main/scala/io/gearpump/metrics/JvmMetricsSet.scala
index 606a033..28d420a 100644
--- a/core/src/main/scala/io/gearpump/metrics/JvmMetricsSet.scala
+++ b/core/src/main/scala/io/gearpump/metrics/JvmMetricsSet.scala
@@ -1,11 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package io.gearpump.metrics
 
 import java.util
-
-import io.gearpump.codahale.metrics.jvm.{ThreadStatesGaugeSet, MemoryUsageGaugeSet}
-import io.gearpump.codahale.metrics.{Gauge, Metric, MetricSet}
 import scala.collection.JavaConverters._
 
+import io.gearpump.codahale.metrics.jvm.{MemoryUsageGaugeSet, ThreadStatesGaugeSet}
+import io.gearpump.codahale.metrics.{Metric, MetricSet}
+
 class JvmMetricsSet(name: String) extends MetricSet {
 
   override def getMetrics: util.Map[String, Metric] = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/metrics/Meter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/metrics/Meter.scala b/core/src/main/scala/io/gearpump/metrics/Meter.scala
index 660029c..ca79a37 100644
--- a/core/src/main/scala/io/gearpump/metrics/Meter.scala
+++ b/core/src/main/scala/io/gearpump/metrics/Meter.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,7 +20,8 @@ package io.gearpump.metrics
 
 import io.gearpump.codahale.metrics.{Meter => CodaHaleMeter}
 
-class Meter(val name : String, meter : CodaHaleMeter, sampleRate : Int = 1) {
+/** See io.gearpump.codahale.metrics.Meter */
+class Meter(val name: String, meter: CodaHaleMeter, sampleRate: Int = 1) {
   private var sampleCount = 0L
   private var toBeMarked = 0L
 
@@ -37,7 +38,7 @@ class Meter(val name : String, meter : CodaHaleMeter, sampleRate : Int = 1) {
     }
   }
 
-  def getOneMinuteRate() : Double = {
+  def getOneMinuteRate(): Double = {
     meter.getOneMinuteRate
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/metrics/Metrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/metrics/Metrics.scala b/core/src/main/scala/io/gearpump/metrics/Metrics.scala
index ba9a59f..aad1af0 100644
--- a/core/src/main/scala/io/gearpump/metrics/Metrics.scala
+++ b/core/src/main/scala/io/gearpump/metrics/Metrics.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,39 +18,40 @@
 
 package io.gearpump.metrics
 
+import scala.collection.JavaConverters._
 
 import akka.actor._
+import org.slf4j.Logger
+
 import io.gearpump.codahale.metrics._
 import io.gearpump.metrics
 import io.gearpump.util.LogUtil
-import org.slf4j.Logger
-
-import scala.collection.JavaConverters._
 
+/** Metric objects registry */
 class Metrics(sampleRate: Int) extends Extension {
 
   val registry = new MetricRegistry()
 
-  def meter(name : String) = {
+  def meter(name: String): metrics.Meter = {
     new metrics.Meter(name, registry.meter(name), sampleRate)
   }
 
-  def histogram(name : String) = {
+  def histogram(name: String): Histogram = {
     new Histogram(name, registry.histogram(name), sampleRate)
   }
 
-  def histogram(name : String, sampleRate: Int) = {
+  def histogram(name: String, sampleRate: Int): Histogram = {
     new Histogram(name, registry.histogram(name), sampleRate)
   }
 
-  def counter(name : String) = {
+  def counter(name: String): Counter = {
     new Counter(name, registry.counter(name), sampleRate)
   }
 
   def register(set: MetricSet): Unit = {
     val names = registry.getNames
-    val metrics = set.getMetrics.asScala.filterKeys {key => !names.contains(key)}
-    metrics.foreach{kv =>
+    val metrics = set.getMetrics.asScala.filterKeys { key => !names.contains(key) }
+    metrics.foreach { kv =>
       registry.register(kv._1, kv._2)
     }
   }
@@ -88,10 +89,10 @@ object Metrics extends ExtensionId[Metrics] with ExtensionIdProvider {
     }
   }
 
-  case class Histogram
-      (name: String, mean: Double,
-       stddev: Double, median: Double,
-       p95: Double, p99: Double, p999: Double)
+  case class Histogram (
+      name: String, mean: Double,
+      stddev: Double, median: Double,
+      p95: Double, p99: Double, p999: Double)
     extends MetricType
 
   case class Counter(name: String, value: Long) extends MetricType
@@ -118,7 +119,7 @@ object Metrics extends ExtensionId[Metrics] with ExtensionIdProvider {
 
   override def get(system: ActorSystem): Metrics = super.get(system)
 
-  override def lookup = Metrics
+  override def lookup: ExtensionId[Metrics] = Metrics
 
   override def createExtension(system: ExtendedActorSystem): Metrics = {
     val metricsEnabled = system.settings.config.getBoolean(GEARPUMP_METRIC_ENABLED)
@@ -133,27 +134,27 @@ object Metrics extends ExtensionId[Metrics] with ExtensionIdProvider {
   }
 
   class DummyMetrics extends Metrics(1) {
-    override def register(set: MetricSet) = Unit
+    override def register(set: MetricSet): Unit = Unit
 
     private val meter = new metrics.Meter("", null) {
-      override def mark() = Unit
-      override  def mark(n: Long) = Unit
+      override def mark(): Unit = Unit
+      override def mark(n: Long): Unit = Unit
       override def getOneMinuteRate(): Double = 0
     }
 
     private val histogram = new metrics.Histogram("", null) {
-      override def update(value: Long) = Unit
-      override def getMean() : Double = 0
-      override def getStdDev() : Double = 0
+      override def update(value: Long): Unit = Unit
+      override def getMean(): Double = 0
+      override def getStdDev(): Double = 0
     }
 
     private val counter = new metrics.Counter("", null) {
-      override def inc() = Unit
-      override def inc(n: Long) = Unit
+      override def inc(): Unit = Unit
+      override def inc(n: Long): Unit = Unit
     }
 
-    override def meter(name : String) =  meter
-    override def histogram(name : String) = histogram
-    override def counter(name : String) = counter
+    override def meter(name: String): metrics.Meter = meter
+    override def histogram(name: String): metrics.Histogram = histogram
+    override def counter(name: String): metrics.Counter = counter
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/metrics/MetricsAggregator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/metrics/MetricsAggregator.scala b/core/src/main/scala/io/gearpump/metrics/MetricsAggregator.scala
index 6c4c34f..f52a060 100644
--- a/core/src/main/scala/io/gearpump/metrics/MetricsAggregator.scala
+++ b/core/src/main/scala/io/gearpump/metrics/MetricsAggregator.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,18 +18,15 @@
 
 package io.gearpump.metrics
 
-import java.util
-
 import io.gearpump.cluster.MasterToClient.HistoryMetricsItem
 
 /**
- * Will aggregate a full set of metrics into a smaller set
+ * Aggregates a larger set of metrics into a smaller set
  *
  * Sub Class must implement a constructor with signature like this:
- *  MetricsAggregator(config: Config)
- *
- *
+ * MetricsAggregator(config: Config)
  */
 trait MetricsAggregator {
-  def aggregate(options: Map[String, String], inputs: Iterator[HistoryMetricsItem]): List[HistoryMetricsItem]
+  def aggregate(options: Map[String, String], inputs: Iterator[HistoryMetricsItem])
+  : List[HistoryMetricsItem]
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/metrics/MetricsReporterService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/metrics/MetricsReporterService.scala b/core/src/main/scala/io/gearpump/metrics/MetricsReporterService.scala
index c5be041..05decdd 100644
--- a/core/src/main/scala/io/gearpump/metrics/MetricsReporterService.scala
+++ b/core/src/main/scala/io/gearpump/metrics/MetricsReporterService.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,16 +20,22 @@ package io.gearpump.metrics
 
 import java.net.InetSocketAddress
 import java.util.concurrent.TimeUnit
+import scala.concurrent.duration._
+
+import akka.actor.{Actor, ActorRef}
 
-import akka.actor.{ActorRef, ExtendedActorSystem, Actor}
-import io.gearpump.codahale.metrics.{Slf4jReporter, MetricFilter, ScheduledReporter}
-import io.gearpump.codahale.metrics.graphite.{GraphiteReporter, Graphite}
-import io.gearpump.metrics.Metrics.{ReportMetrics, DemandMoreMetrics}
-import io.gearpump.metrics.MetricsReporterService.{ReportTo}
+import io.gearpump.codahale.metrics.graphite.{Graphite, GraphiteReporter}
+import io.gearpump.codahale.metrics.{MetricFilter, Slf4jReporter}
+import io.gearpump.metrics.Metrics.{DemandMoreMetrics, ReportMetrics}
+import io.gearpump.metrics.MetricsReporterService.ReportTo
 import io.gearpump.util.Constants._
 import io.gearpump.util.LogUtil
-import scala.concurrent.duration._
 
+/**
+ * Reports the metrics data to some where, like Ganglia, remote Akka actor, log files...
+ *
+ * @param metrics Holds a list of metrics object.
+ */
 class MetricsReporterService(metrics: Metrics) extends Actor {
 
   private val LOG = LogUtil.getLogger(getClass)
@@ -40,14 +46,15 @@ class MetricsReporterService(metrics: Metrics) extends Actor {
   implicit val dispatcher = context.dispatcher
 
   def receive: Receive = {
+    // The subscriber is demanding more messages.
     case DemandMoreMetrics(subscriber) => {
       reporter.report(subscriber)
-      context.system.scheduler.scheduleOnce(reportInterval milliseconds,
+      context.system.scheduler.scheduleOnce(reportInterval.milliseconds,
         subscriber, ReportMetrics)
     }
   }
 
-  def startGraphiteReporter = {
+  def startGraphiteReporter(): ReportTo = {
     val graphiteHost = system.settings.config.getString(GEARPUMP_METRIC_GRAPHITE_HOST)
     val graphitePort = system.settings.config.getInt(GEARPUMP_METRIC_GRAPHITE_PORT)
 
@@ -64,7 +71,7 @@ class MetricsReporterService(metrics: Metrics) extends Actor {
     }
   }
 
-  def startSlf4jReporter = {
+  def startSlf4jReporter(): ReportTo = {
     new ReportTo {
       val reporter = Slf4jReporter.forRegistry(metrics.registry)
         .convertRatesTo(TimeUnit.SECONDS)
@@ -77,7 +84,7 @@ class MetricsReporterService(metrics: Metrics) extends Actor {
     }
   }
 
-  def startAkkaReporter = {
+  def startAkkaReporter(): ReportTo = {
     new AkkaReporter(system, metrics.registry)
   }
 
@@ -85,16 +92,18 @@ class MetricsReporterService(metrics: Metrics) extends Actor {
     val reporterType = system.settings.config.getString(GEARPUMP_METRIC_REPORTER)
     LOG.info(s"Metrics reporter is enabled, using $reporterType reporter")
     val reporter = reporterType match {
-      case "graphite" => startGraphiteReporter
-      case "logfile" => startSlf4jReporter
-      case "akka" => startAkkaReporter
+      case "graphite" => startGraphiteReporter()
+      case "logfile" => startSlf4jReporter()
+      case "akka" => startAkkaReporter()
     }
     reporter
   }
 }
 
 object MetricsReporterService {
-  trait ReportTo{
+
+  /** Target where user want to report the metrics data to */
+  trait ReportTo {
     def report(to: ActorRef): Unit
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/package.scala b/core/src/main/scala/io/gearpump/package.scala
index 1ed94a7..1877651 100644
--- a/core/src/main/scala/io/gearpump/package.scala
+++ b/core/src/main/scala/io/gearpump/package.scala
@@ -1,50 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package io
 
 package object gearpump {
   type TimeStamp = Long
   val LatestTime = -1
-
-  /**
-   * WorkerId is used to uniquely track a worker machine.
-   *
-   * @param sessionId sessionId is assigned by Master node for easy tracking. It is possible that
-   *                  sessionId is **NOT** unique, so always use WorkerId for comparison.
-   * @param registerTime the timestamp when a worker node register itself to master node
-   */
-  case class WorkerId(sessionId: Int, registerTime: Long)
-
-  object WorkerId {
-    val unspecified: WorkerId = new WorkerId(-1, 0L)
-
-    def render(workerId: WorkerId): String = {
-      workerId.registerTime + "_" + workerId.sessionId
-    }
-
-    def parse(str: String): WorkerId = {
-      val pair = str.split("_")
-      new WorkerId(pair(1).toInt, pair(0).toLong)
-    }
-
-    implicit val workerIdOrdering: Ordering[WorkerId] = {
-      new Ordering[WorkerId] {
-
-        /** Compare timestamp first, then id */
-        override def compare(x: WorkerId, y: WorkerId): Int = {
-          if (x.registerTime < y.registerTime) {
-            -1
-          } else if (x.registerTime == y.registerTime) {
-            if (x.sessionId < y.sessionId) {
-              -1
-            } else if (x.sessionId == y.sessionId) {
-              0
-            } else {
-              1
-            }
-          } else {
-            1
-          }
-        }
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/partitioner/BroadcastPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/partitioner/BroadcastPartitioner.scala b/core/src/main/scala/io/gearpump/partitioner/BroadcastPartitioner.scala
index dba02ee..0b9c57e 100644
--- a/core/src/main/scala/io/gearpump/partitioner/BroadcastPartitioner.scala
+++ b/core/src/main/scala/io/gearpump/partitioner/BroadcastPartitioner.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,11 +20,13 @@ package io.gearpump.partitioner
 
 import io.gearpump.Message
 
+/** Used by storm module to broadcast message to all downstream tasks  */
 class BroadcastPartitioner extends MulticastPartitioner {
   private var lastPartitionNum = -1
   private var partitions = Array.empty[Int]
 
-  override def getPartitions(msg: Message, partitionNum: Int, currentPartitionId: Int): Array[Int] = {
+  override def getPartitions(
+      msg: Message, partitionNum: Int, currentPartitionId: Int): Array[Int] = {
     if (partitionNum != lastPartitionNum) {
       partitions = (0 until partitionNum).toArray
       lastPartitionNum = partitionNum

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/partitioner/CoLocationPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/partitioner/CoLocationPartitioner.scala b/core/src/main/scala/io/gearpump/partitioner/CoLocationPartitioner.scala
index 3ed6dd4..062fc10 100644
--- a/core/src/main/scala/io/gearpump/partitioner/CoLocationPartitioner.scala
+++ b/core/src/main/scala/io/gearpump/partitioner/CoLocationPartitioner.scala
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package io.gearpump.partitioner
 
 import io.gearpump.Message
@@ -7,7 +25,7 @@ import io.gearpump.Message
  * And each task in current processor will co-locate with task of last processor
  */
 class CoLocationPartitioner extends UnicastPartitioner {
-  override def getPartition(msg : Message, partitionNum : Int, currentPartitionId: Int) : Int = {
+  override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
     currentPartitionId
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/partitioner/HashPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/partitioner/HashPartitioner.scala b/core/src/main/scala/io/gearpump/partitioner/HashPartitioner.scala
index 5e4c7bc..6ba0cd6 100644
--- a/core/src/main/scala/io/gearpump/partitioner/HashPartitioner.scala
+++ b/core/src/main/scala/io/gearpump/partitioner/HashPartitioner.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -26,7 +26,7 @@ import io.gearpump.Message
  * same hash code after serialization and deserialization.
  */
 class HashPartitioner extends UnicastPartitioner {
-  override def getPartition(msg : Message, partitionNum : Int, currentPartitionId: Int) : Int = {
+  override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
     (msg.msg.hashCode() & Integer.MAX_VALUE) % partitionNum
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/partitioner/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/partitioner/Partitioner.scala b/core/src/main/scala/io/gearpump/partitioner/Partitioner.scala
index 6285bb7..69104c7 100644
--- a/core/src/main/scala/io/gearpump/partitioner/Partitioner.scala
+++ b/core/src/main/scala/io/gearpump/partitioner/Partitioner.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,22 +18,34 @@
 
 package io.gearpump.partitioner
 
-import io.gearpump.Message
 import scala.reflect.ClassTag
+
 import org.apache.commons.lang.SerializationUtils
 
+import io.gearpump.Message
+
+/**
+ * For processor chain: A (3 tasks) {@literal ->} B (3 tasks), partitioner decide how ONE task
+ * of upstream processor A send to several tasks of downstream processor B.
+ */
 sealed trait Partitioner extends Serializable
 
+/**
+ * For processor chain: A (3 tasks) {@literal ->} B (3 tasks), UnicastPartitioner does
+ * ONE-task {@literal ->} ONE-task mapping.
+ */
 trait UnicastPartitioner extends Partitioner {
-    /**
+
+  /**
+   * Gets the SINGLE downstream processor task index to send message to.
    *
-   * @param msg
-   * @param partitionNum
-   * @param currentPartitionId, used when the downstream processor want to share the same
-   *   partition id,
-   * @return
+   * @param msg Message you want to send
+   * @param partitionNum How many tasks does the downstream processor have.
+   * @param upstreamTaskIndex Upstream task's task index who trigger the getPartition() call.
+   *
+   * @return ONE task index of downstream processor.
    */
-  def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int
+  def getPartition(msg: Message, partitionNum: Int, upstreamTaskIndex: Int): Int
 
   def getPartition(msg: Message, partitionNum: Int): Int = {
     getPartition(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID)
@@ -41,14 +53,20 @@ trait UnicastPartitioner extends Partitioner {
 }
 
 trait MulticastPartitioner extends Partitioner {
-  def getPartitions(msg: Message, partitionNum: Int, currentPartitionId: Int): Array[Int]
+
+  /**
+   * Gets a list of downstream processor task indexes to send message to.
+   *
+   * @param upstreamTaskIndex Current sender task's task index.
+   *
+   */
+  def getPartitions(msg: Message, partitionNum: Int, upstreamTaskIndex: Int): Array[Int]
 
   def getPartitions(msg: Message, partitionNum: Int): Array[Int] = {
     getPartitions(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID)
   }
 }
 
-
 sealed trait PartitionerFactory {
 
   def name: String
@@ -56,26 +74,32 @@ sealed trait PartitionerFactory {
   def partitioner: Partitioner
 }
 
-class PartitionerObject(private [this] val _partitioner: Partitioner) extends PartitionerFactory with Serializable {
+/** Stores the Partitioner in an object. To use it, user need to deserialize the object */
+class PartitionerObject(private[this] val _partitioner: Partitioner)
+  extends PartitionerFactory with Serializable {
 
   override def name: String = partitioner.getClass.getName
 
-  override def partitioner: Partitioner = SerializationUtils.clone(_partitioner).asInstanceOf[Partitioner]
+  override def partitioner: Partitioner = {
+    SerializationUtils.clone(_partitioner).asInstanceOf[Partitioner]
+  }
 }
 
-class PartitionerByClassName(partitionerClass: String) extends PartitionerFactory with Serializable {
-  override def name: String = partitionerClass
+/** Store the partitioner in class Name, the user need to instantiate a new class */
+class PartitionerByClassName(partitionerClass: String)
+  extends PartitionerFactory with Serializable {
 
-  override def partitioner: Partitioner = Class.forName(partitionerClass).newInstance().asInstanceOf[Partitioner]
+  override def name: String = partitionerClass
+  override def partitioner: Partitioner = {
+    Class.forName(partitionerClass).newInstance().asInstanceOf[Partitioner]
+  }
 }
 
-
 /**
- * @param partitionerFactory
+ * @param partitionerFactory How we construct a Partitioner.
  */
 case class PartitionerDescription(partitionerFactory: PartitionerFactory)
 
-
 object Partitioner {
   val UNKNOWN_PARTITION_ID = -1
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/partitioner/ShuffleGroupingPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/partitioner/ShuffleGroupingPartitioner.scala b/core/src/main/scala/io/gearpump/partitioner/ShuffleGroupingPartitioner.scala
index 035c9d2..ff962fa 100644
--- a/core/src/main/scala/io/gearpump/partitioner/ShuffleGroupingPartitioner.scala
+++ b/core/src/main/scala/io/gearpump/partitioner/ShuffleGroupingPartitioner.scala
@@ -15,11 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package io.gearpump.partitioner
 
-import io.gearpump.Message
 import scala.util.Random
 
+import io.gearpump.Message
+
 /**
  * The idea of ShuffleGroupingPartitioner is derived from Storm.
  * Messages are randomly distributed across the downstream's tasks in a way such that

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/partitioner/ShufflePartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/partitioner/ShufflePartitioner.scala b/core/src/main/scala/io/gearpump/partitioner/ShufflePartitioner.scala
index d0d3c39..6b3c26e 100644
--- a/core/src/main/scala/io/gearpump/partitioner/ShufflePartitioner.scala
+++ b/core/src/main/scala/io/gearpump/partitioner/ShufflePartitioner.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -23,17 +23,16 @@ import java.util.Random
 import io.gearpump.Message
 
 /**
- * Round Robin partition the data.
+ * Round Robin partition the data to downstream processor tasks.
  */
 class ShufflePartitioner extends UnicastPartitioner {
   private var seed = 0
   private var count = 0
 
-
-  override def getPartition(msg : Message, partitionNum : Int, currentPartitionId: Int) : Int = {
+  override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
 
     if (seed == 0) {
-      seed = newSeed
+      seed = newSeed()
     }
 
     val result = ((count + seed) & Integer.MAX_VALUE) % partitionNum
@@ -41,5 +40,5 @@ class ShufflePartitioner extends UnicastPartitioner {
     result
   }
 
-  def newSeed = new Random().nextInt()
+  private def newSeed(): Int = new Random().nextInt()
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/security/Authenticator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/security/Authenticator.scala b/core/src/main/scala/io/gearpump/security/Authenticator.scala
index afb13a9..73bc8e1 100644
--- a/core/src/main/scala/io/gearpump/security/Authenticator.scala
+++ b/core/src/main/scala/io/gearpump/security/Authenticator.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,23 +17,21 @@
  */
 
 package io.gearpump.security
-import io.gearpump.security.Authenticator.AuthenticationResult
-
 import scala.concurrent.{ExecutionContext, Future}
 
+import io.gearpump.security.Authenticator.AuthenticationResult
 
 /**
  * Authenticator for UI dashboard.
  *
  * Sub Class must implement a constructor with signature like this:
- *  this(config: Config)
- *
+ * this(config: Config)
  */
 trait Authenticator {
 
-  // TODO: Change the signature to return more attributes of user
-  // credentials...
-  def authenticate(user: String, password: String, ec: ExecutionContext): Future[AuthenticationResult]
+  // TODO: Change the signature to return more attributes of user credentials...
+  def authenticate(
+      user: String, password: String, ec: ExecutionContext): Future[AuthenticationResult]
 }
 
 object Authenticator {
@@ -45,22 +43,25 @@ object Authenticator {
     def permissionLevel: Int
   }
 
-  val UnAuthenticated = new AuthenticationResult{
+  val UnAuthenticated = new AuthenticationResult {
     override val authenticated = false
     override val permissionLevel = -1
   }
 
-  val Guest = new AuthenticationResult{
+  /** Guest can view but have no permission to submit app or write */
+  val Guest = new AuthenticationResult {
     override val authenticated = true
     override val permissionLevel = 1000
   }
 
-  val User = new AuthenticationResult{
+  /** User can submit app, kill app, but have no permission to add or remote machines */
+  val User = new AuthenticationResult {
     override val authenticated = true
     override val permissionLevel = 1000 + Guest.permissionLevel
   }
 
-  val Admin = new AuthenticationResult{
+  /** Super user */
+  val Admin = new AuthenticationResult {
     override val authenticated = true
     override val permissionLevel = 1000 + User.permissionLevel
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/security/ConfigFileBasedAuthenticator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/security/ConfigFileBasedAuthenticator.scala b/core/src/main/scala/io/gearpump/security/ConfigFileBasedAuthenticator.scala
index 3ecfd88..0743a3f 100644
--- a/core/src/main/scala/io/gearpump/security/ConfigFileBasedAuthenticator.scala
+++ b/core/src/main/scala/io/gearpump/security/ConfigFileBasedAuthenticator.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,10 +18,12 @@
 
 package io.gearpump.security
 
+import scala.concurrent.{ExecutionContext, Future}
+
+import com.typesafe.config.Config
+
 import io.gearpump.security.Authenticator.AuthenticationResult
 import io.gearpump.security.ConfigFileBasedAuthenticator._
-import com.typesafe.config.Config
-import scala.concurrent.{ExecutionContext, Future}
 
 object ConfigFileBasedAuthenticator {
 
@@ -30,7 +32,9 @@ object ConfigFileBasedAuthenticator {
   private val USERS = ROOT + "." + "users"
   private val GUESTS = ROOT + "." + "guests"
 
-  private case class Credentials(admins: Map[String, String], users: Map[String, String], guests: Map[String, String]) {
+  private case class Credentials(
+      admins: Map[String, String], users: Map[String, String], guests: Map[String, String]) {
+
     def verify(user: String, password: String): AuthenticationResult = {
       if (admins.contains(user)) {
         if (verify(user, password, admins)) {
@@ -70,26 +74,32 @@ object ConfigFileBasedAuthenticator {
  * users have limited permission to submit an application and etc..
  * guests can not submit/kill applications, but can view the application status.
  *
- * see conf/gear.conf section gearpump.ui-security.config-file-based-authenticator to find information
- * about how to configure this authenticator.
+ * see conf/gear.conf section gearpump.ui-security.config-file-based-authenticator to find
+ * information about how to configure this authenticator.
  *
  * [Security consideration]
- * It will keep one-way sha1 digest of password instead of password itself. The original password is NOT
- * kept in any way, so generally it is safe.
+ * It will keep one-way sha1 digest of password instead of password itself. The original password is
+ * NOT kept in any way, so generally it is safe.
  *
- * digesting flow (from original password to digest):
- * random salt byte array of length 8 -> byte array of (salt + sha1(salt, password)) -> base64Encode
  *
- * verification user input password with stored digest:
- * base64Decode -> extract salt -> do sha1(salt, password) -> generate digest: salt + sha1 ->
- * compare the generated digest with the stored digest.
+ * digesting flow (from original password to digest):
+ * {{{
+ * random salt byte array of length 8 -> byte array of (salt + sha1(salt, password)) ->
+ * base64Encode.
+ * }}}
  *
+ * Verification user input password with stored digest:
+ * {{{
+ * base64Decode -> extract salt -> do sha1(salt, password) -> generate digest:
+ * salt + sha1 -> compare the generated digest with the stored digest.
+ * }}}
  */
 class ConfigFileBasedAuthenticator(config: Config) extends Authenticator {
 
   private val credentials = loadCredentials(config)
 
-  override def authenticate(user: String, password: String, ec: ExecutionContext): Future[AuthenticationResult] = {
+  override def authenticate(user: String, password: String, ec: ExecutionContext)
+    : Future[AuthenticationResult] = {
     implicit val ctx = ec
     Future {
       credentials.verify(user, password)
@@ -97,13 +107,13 @@ class ConfigFileBasedAuthenticator(config: Config) extends Authenticator {
   }
 
   private def loadCredentials(config: Config): Credentials = {
-    val admins  = configToMap(config, ADMINS)
-    val users  = configToMap(config, USERS)
+    val admins = configToMap(config, ADMINS)
+    val users = configToMap(config, USERS)
     val guests = configToMap(config, GUESTS)
     new Credentials(admins, users, guests)
   }
 
-  private def configToMap(config : Config, path: String) = {
+  private def configToMap(config: Config, path: String) = {
     import scala.collection.JavaConverters._
     config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) => k -> v.toString }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/security/PasswordUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/security/PasswordUtil.scala b/core/src/main/scala/io/gearpump/security/PasswordUtil.scala
index f8eafdd..9bf40d2 100644
--- a/core/src/main/scala/io/gearpump/security/PasswordUtil.scala
+++ b/core/src/main/scala/io/gearpump/security/PasswordUtil.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,9 +19,10 @@
 package io.gearpump.security
 
 import java.security.MessageDigest
-import sun.misc.{BASE64Decoder, BASE64Encoder}
 import scala.util.Try
 
+import sun.misc.{BASE64Decoder, BASE64Encoder}
+
 /**
  * Util to verify whether user input password is valid or not.
  * It use sha1 to do the digesting.
@@ -30,9 +31,11 @@ object PasswordUtil {
   private val SALT_LENGTH = 8
 
   /**
-   * verification user input password with stored digest:
+   * Verifies user input password with stored digest:
+   * {{{
    * base64Decode -> extract salt -> do sha1(salt, password) ->
    * generate digest: salt + sha1 -> compare the generated digest with the stored digest.
+   * }}}
    */
   def verify(password: String, stored: String): Boolean = {
     Try {
@@ -45,7 +48,10 @@ object PasswordUtil {
   }
   /**
    * digesting flow (from original password to digest):
-   * random salt byte array of length 8 -> byte array of (salt + sha1(salt, password)) -> base64Encode
+   * {{{
+   * random salt byte array of length 8 ->
+   * byte array of (salt + sha1(salt, password)) -> base64Encode
+   * }}}
    */
   def hash(password: String): String = {
     // Salt generation 64 bits long
@@ -66,8 +72,8 @@ object PasswordUtil {
   }
 
   private def base64Encode(data: Array[Byte]): String = {
-     val endecoder = new BASE64Encoder()
-     endecoder.encode(data)
+    val endecoder = new BASE64Encoder()
+    endecoder.encode(data)
   }
 
   private def base64Decode(data: String): Array[Byte] = {
@@ -75,13 +81,14 @@ object PasswordUtil {
     decoder.decodeBuffer(data)
   }
 
-  private def help = {
+  // scalastyle:off println
+  private def help() = {
     Console.println("usage: gear io.gearpump.security.PasswordUtil -password <your password>")
   }
 
   def main(args: Array[String]): Unit = {
     if (args.length != 2 || args(0) != "-password") {
-      help
+      help()
     } else {
       val pass = args(1)
       val result = hash(pass)
@@ -90,4 +97,5 @@ object PasswordUtil {
       Console.println(result)
     }
   }
+  // scalastyle:on println
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/serializer/FastKryoSerializationFramework.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/serializer/FastKryoSerializationFramework.scala b/core/src/main/scala/io/gearpump/serializer/FastKryoSerializationFramework.scala
index a69ab43..cb9d563 100644
--- a/core/src/main/scala/io/gearpump/serializer/FastKryoSerializationFramework.scala
+++ b/core/src/main/scala/io/gearpump/serializer/FastKryoSerializationFramework.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,12 +15,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package io.gearpump.serializer
 
 import akka.actor.ExtendedActorSystem
+
 import io.gearpump.cluster.UserConfig
 
-class FastKryoSerializationFramework extends SerializationFramework{
+/**
+ * A build-in serializer framework using kryo
+ *
+ * NOTE: The Kryo here is a shaded version by Gearpump
+ */
+class FastKryoSerializationFramework extends SerializationFramework {
   private var system: ExtendedActorSystem = null
 
   private lazy val pool = new ThreadLocal[Serializer]() {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/serializer/FastKryoSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/serializer/FastKryoSerializer.scala b/core/src/main/scala/io/gearpump/serializer/FastKryoSerializer.scala
index 817cd84..57b7b5e 100644
--- a/core/src/main/scala/io/gearpump/serializer/FastKryoSerializer.scala
+++ b/core/src/main/scala/io/gearpump/serializer/FastKryoSerializer.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,13 +19,14 @@
 package io.gearpump.serializer
 
 import akka.actor.ExtendedActorSystem
+
 import io.gearpump.esotericsoftware.kryo.Kryo.DefaultInstantiatorStrategy
+import io.gearpump.objenesis.strategy.StdInstantiatorStrategy
 import io.gearpump.romix.serialization.kryo.KryoSerializerWrapper
 import io.gearpump.serializer.FastKryoSerializer.KryoSerializationException
 import io.gearpump.util.LogUtil
-import io.gearpump.objenesis.strategy.StdInstantiatorStrategy
 
-class FastKryoSerializer(system: ExtendedActorSystem) extends Serializer{
+class FastKryoSerializer(system: ExtendedActorSystem) extends Serializer {
 
   private val LOG = LogUtil.getLogger(getClass)
   private val config = system.settings.config
@@ -37,7 +38,7 @@ class FastKryoSerializer(system: ExtendedActorSystem) extends Serializer{
   kryo.setInstantiatorStrategy(strategy)
   private val kryoClazz = new GearpumpSerialization(config).customize(kryo)
 
-  override def serialize(message: Any) : Array[Byte] = {
+  override def serialize(message: Any): Array[Byte] = {
     try {
       kryoSerializer.toBinary(message)
     } catch {
@@ -72,7 +73,7 @@ class FastKryoSerializer(system: ExtendedActorSystem) extends Serializer{
     }
   }
 
-  override def deserialize(msg : Array[Byte]): Any = {
+  override def deserialize(msg: Array[Byte]): Any = {
     kryoSerializer.fromBinary(msg)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/serializer/GearpumpSerialization.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/serializer/GearpumpSerialization.scala b/core/src/main/scala/io/gearpump/serializer/GearpumpSerialization.scala
index 41ccaa4..a7eb6cf 100644
--- a/core/src/main/scala/io/gearpump/serializer/GearpumpSerialization.scala
+++ b/core/src/main/scala/io/gearpump/serializer/GearpumpSerialization.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,16 +18,17 @@
 
 package io.gearpump.serializer
 
-import io.gearpump.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer}
 import com.typesafe.config.Config
-import io.gearpump.util.{Constants, LogUtil}
 import org.slf4j.Logger
 
+import io.gearpump.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer}
+import io.gearpump.util.{Constants, LogUtil}
+
 class GearpumpSerialization(config: Config) {
 
   private val LOG: Logger = LogUtil.getLogger(getClass)
 
-  def customize(kryo: Kryo): Unit  = {
+  def customize(kryo: Kryo): Unit = {
 
     val serializationMap = configToMap(config, Constants.GEARPUMP_SERIALIZERS)
 
@@ -37,21 +38,22 @@ class GearpumpSerialization(config: Config) {
 
       if (value == null || value.isEmpty) {
 
-        //Use default serializer for this class type
+        // Use default serializer for this class type
         kryo.register(keyClass)
       } else {
         val valueClass = Class.forName(value)
-        val register = kryo.register(keyClass, valueClass.newInstance().asInstanceOf[KryoSerializer[_]])
+        val register = kryo.register(keyClass,
+          valueClass.newInstance().asInstanceOf[KryoSerializer[_]])
         LOG.debug(s"Registering ${keyClass}, id: ${register.getId}")
       }
     }
     kryo.setReferences(false)
 
-    // require the user to register the class first before using
+    // Requires the user to register the class first before using
     kryo.setRegistrationRequired(true)
   }
 
-  private final def configToMap(config : Config, path: String) = {
+  private final def configToMap(config: Config, path: String) = {
     import scala.collection.JavaConverters._
     config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) => k -> v.toString }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/serializer/SerializationFramework.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/serializer/SerializationFramework.scala b/core/src/main/scala/io/gearpump/serializer/SerializationFramework.scala
index d466ccf..4947dcc 100644
--- a/core/src/main/scala/io/gearpump/serializer/SerializationFramework.scala
+++ b/core/src/main/scala/io/gearpump/serializer/SerializationFramework.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,15 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package io.gearpump.serializer
 
 import akka.actor.ExtendedActorSystem
+
 import io.gearpump.cluster.UserConfig
 
 /**
  * User are allowed to use a customized serialization framework by extending this
  * interface.
- *
  */
 trait SerializationFramework {
   def init(system: ExtendedActorSystem, config: UserConfig)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/serializer/Serializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/serializer/Serializer.scala b/core/src/main/scala/io/gearpump/serializer/Serializer.scala
index 02e30db..ff8b147 100644
--- a/core/src/main/scala/io/gearpump/serializer/Serializer.scala
+++ b/core/src/main/scala/io/gearpump/serializer/Serializer.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,13 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package io.gearpump.serializer
 
 /**
  * User defined message serializer
  */
 trait Serializer {
-  def serialize(message: Any) : Array[Byte]
+  def serialize(message: Any): Array[Byte]
 
-  def deserialize(msg : Array[Byte]): Any
+  def deserialize(msg: Array[Byte]): Any
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/transport/Express.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/transport/Express.scala b/core/src/main/scala/io/gearpump/transport/Express.scala
index e8f1b8e..101b841 100644
--- a/core/src/main/scala/io/gearpump/transport/Express.scala
+++ b/core/src/main/scala/io/gearpump/transport/Express.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,17 +18,20 @@
 
 package io.gearpump.transport
 
+import scala.collection.immutable.LongMap
+import scala.concurrent._
+
 import akka.actor._
 import akka.agent.Agent
+import org.slf4j.Logger
+
 import io.gearpump.transport.netty.Client.Close
 import io.gearpump.transport.netty.{Context, TaskMessage}
 import io.gearpump.util.LogUtil
-import org.slf4j.Logger
-
-import scala.collection.immutable.LongMap
-import scala.concurrent._
 
 trait ActorLookupById {
+
+  /** Lookup actor ref for local task actor by providing a TaskId (TaskId.toLong) */
   def lookupLocalActor(id: Long): Option[ActorRef]
 }
 
@@ -40,8 +43,9 @@ trait ActorLookupById {
  */
 class Express(val system: ExtendedActorSystem) extends Extension with ActorLookupById {
 
-  import io.gearpump.transport.Express._
   import system.dispatcher
+
+  import io.gearpump.transport.Express._
   val localActorMap = Agent(LongMap.empty[ActorRef])
   val remoteAddressMap = Agent(Map.empty[Long, HostPort])
 
@@ -59,15 +63,16 @@ class Express(val system: ExtendedActorSystem) extends Extension with ActorLooku
     LOG.info(s"binding to netty server $localHost")
 
     system.registerOnTermination(new Runnable {
-      override def run = context.close
+      override def run(): Unit = context.close()
     })
     (context, serverPort, localHost)
   }
 
-  def unregisterLocalActor(id : Long) : Unit = {
+  def unregisterLocalActor(id: Long): Unit = {
     localActorMap.sendOff(_ - id)
   }
 
+  /** Start Netty client actors to connect to remote machines */
   def startClients(hostPorts: Set[HostPort]): Future[Map[HostPort, ActorRef]] = {
     val clientsToClose = remoteClientMap.get().filterKeys(!hostPorts.contains(_)).keySet
     closeClients(clientsToClose)
@@ -85,7 +90,7 @@ class Express(val system: ExtendedActorSystem) extends Extension with ActorLooku
 
   def closeClients(hostPorts: Set[HostPort]): Future[Map[HostPort, ActorRef]] = {
     remoteClientMap.alter { map =>
-      map.filterKeys(hostPorts.contains).foreach{ hostAndClient =>
+      map.filterKeys(hostPorts.contains).foreach { hostAndClient =>
         val (_, client) = hostAndClient
         client ! Close
       }
@@ -93,36 +98,38 @@ class Express(val system: ExtendedActorSystem) extends Extension with ActorLooku
     }
   }
 
-  def registerLocalActor(id : Long, actor: ActorRef): Unit = {
+  def registerLocalActor(id: Long, actor: ActorRef): Unit = {
     LOG.info(s"RegisterLocalActor: $id, actor: ${actor.path.name}")
     init
     localActorMap.sendOff(_ + (id -> actor))
   }
 
-  def lookupLocalActor(id: Long) = localActorMap.get().get(id)
+  def lookupLocalActor(id: Long): Option[ActorRef] = localActorMap.get().get(id)
 
-  def lookupRemoteAddress(id : Long) = remoteAddressMap.get().get(id)
+  def lookupRemoteAddress(id: Long): Option[HostPort] = remoteAddressMap.get().get(id)
 
-  //transport to remote address
+  /** Send message to remote task */
   def transport(taskMessage: TaskMessage, remote: HostPort): Unit = {
 
     val remoteClient = remoteClientMap.get.get(remote)
     if (remoteClient.isDefined) {
       remoteClient.get.tell(taskMessage, Actor.noSender)
     } else {
-      val errorMsg = s"Clients has not been launched properly before transporting messages, the destination is $remote"
+      val errorMsg = s"Clients has not been launched properly before transporting messages, " +
+        s"the destination is $remote"
       LOG.error(errorMsg)
       throw new Exception(errorMsg)
     }
   }
 }
 
+/** A customized transport layer by using Akka extension */
 object Express extends ExtensionId[Express] with ExtensionIdProvider {
   val LOG: Logger = LogUtil.getLogger(getClass)
 
   override def get(system: ActorSystem): Express = super.get(system)
 
-  override def lookup = Express
+  override def lookup: ExtensionId[Express] = Express
 
   override def createExtension(system: ExtendedActorSystem): Express = new Express(system)
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/transport/HostPort.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/transport/HostPort.scala b/core/src/main/scala/io/gearpump/transport/HostPort.scala
index 72da203..40c4342 100644
--- a/core/src/main/scala/io/gearpump/transport/HostPort.scala
+++ b/core/src/main/scala/io/gearpump/transport/HostPort.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -25,7 +25,7 @@ case class HostPort(host: String, port: Int) {
 }
 
 object HostPort {
-  def apply(address : String) : HostPort = {
+  def apply(address: String): HostPort = {
     val hostAndPort = address.split(":")
     new HostPort(hostAndPort(0), hostAndPort(1).toInt)
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/transport/netty/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/transport/netty/Client.scala b/core/src/main/scala/io/gearpump/transport/netty/Client.scala
index 2ebf09d..d5960ad 100644
--- a/core/src/main/scala/io/gearpump/transport/netty/Client.scala
+++ b/core/src/main/scala/io/gearpump/transport/netty/Client.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -23,18 +23,22 @@ import java.nio.channels.ClosedChannelException
 import java.util
 import java.util.Random
 import java.util.concurrent.TimeUnit
+import scala.concurrent.duration.FiniteDuration
+import scala.language.implicitConversions
 
 import akka.actor.Actor
-import io.gearpump.transport.HostPort
-import io.gearpump.util.LogUtil
 import org.jboss.netty.bootstrap.ClientBootstrap
 import org.jboss.netty.channel._
 import org.slf4j.Logger
 
-import scala.concurrent.duration.FiniteDuration
-import scala.language.implicitConversions
+import io.gearpump.transport.HostPort
+import io.gearpump.util.LogUtil
 
-class Client(conf: NettyConfig, factory: ChannelFactory, hostPort : HostPort) extends Actor {
+/**
+ * Netty Client implemented as an actor, on the other side, there is a netty server Actor.
+ * All messages sent to this actor will be forwarded to remote machine.
+ */
+class Client(conf: NettyConfig, factory: ChannelFactory, hostPort: HostPort) extends Actor {
   import io.gearpump.transport.netty.Client._
 
   val name = s"netty-client-$hostPort"
@@ -42,7 +46,7 @@ class Client(conf: NettyConfig, factory: ChannelFactory, hostPort : HostPort) ex
   private final var bootstrap: ClientBootstrap = null
   private final val random: Random = new Random
   private val serializer = conf.newTransportSerializer
-  private var channel : Channel = null
+  private var channel: Channel = null
 
   var batch = new util.ArrayList[TaskMessage]
 
@@ -52,25 +56,26 @@ class Client(conf: NettyConfig, factory: ChannelFactory, hostPort : HostPort) ex
     self ! Connect(0)
   }
 
-  def receive = messageHandler orElse connectionHandler
+  def receive: Receive = messageHandler orElse connectionHandler
 
-  def messageHandler : Receive = {
+  def messageHandler: Receive = {
     case msg: TaskMessage =>
       batch.add(msg)
-    case flush @ Flush(flushChannel)  =>
+    case flush@Flush(flushChannel) =>
       if (channel != flushChannel) {
-        Unit //Drop, as it belong to old channel flush message
+        Unit // Drop, as it belong to old channel flush message
       } else if (batch.size > 0 && flushChannel.isWritable) {
         send(flushChannel, batch.iterator)
         batch.clear()
         self ! flush
       } else {
         import context.dispatcher
-        context.system.scheduler.scheduleOnce(new FiniteDuration(conf.flushCheckInterval, TimeUnit.MILLISECONDS), self, flush)
+        context.system.scheduler.scheduleOnce(
+          new FiniteDuration(conf.flushCheckInterval, TimeUnit.MILLISECONDS), self, flush)
       }
   }
 
-  def connectionHandler : Receive = {
+  def connectionHandler: Receive = {
     case ChannelReady(channel) =>
       this.channel = channel
       self ! Flush(channel)
@@ -90,12 +95,12 @@ class Client(conf: NettyConfig, factory: ChannelFactory, hostPort : HostPort) ex
       context.become(closed)
   }
 
-  def closed : Receive = {
-    case msg : AnyRef =>
+  def closed: Receive = {
+    case msg: AnyRef =>
       LOG.error(s"This client $name is closed, drop any message ${msg.getClass.getSimpleName}...")
   }
 
-  private def connect(tries: Int) : Unit = {
+  private def connect(tries: Int): Unit = {
     LOG.info(s"netty client try to connect to $name, tries: $tries")
     if (tries <= conf.max_retries) {
       val remote_addr = new InetSocketAddress(hostPort.host, hostPort.port)
@@ -107,7 +112,9 @@ class Client(conf: NettyConfig, factory: ChannelFactory, hostPort : HostPort) ex
         LOG.error(s"failed to connect to $name, reason: ${ex.getMessage}, class: ${ex.getClass}")
         current.close()
         import context.dispatcher
-        context.system.scheduler.scheduleOnce(new FiniteDuration(getSleepTimeMs(tries), TimeUnit.MILLISECONDS), self, Connect(tries + 1))
+        context.system.scheduler.scheduleOnce(
+          new FiniteDuration(
+            getSleepTimeMs(tries), TimeUnit.MILLISECONDS), self, Connect(tries + 1))
       }
     } else {
       LOG.error(s"fail to connect to a remote host $name after retied $tries ...")
@@ -144,7 +151,7 @@ class Client(conf: NettyConfig, factory: ChannelFactory, hostPort : HostPort) ex
     batch = null
   }
 
-  override def postStop() = {
+  override def postStop(): Unit = {
     close()
   }
 
@@ -154,9 +161,10 @@ class Client(conf: NettyConfig, factory: ChannelFactory, hostPort : HostPort) ex
       if (channel.isOpen) {
         channel.close
       }
-      LOG.error(s"failed to send requests to ${channel.getRemoteAddress} ${ex.getClass.getSimpleName}")
+      LOG.error(s"failed to send requests " +
+        s"to ${channel.getRemoteAddress} ${ex.getClass.getSimpleName}")
       if (!ex.isInstanceOf[ClosedChannelException]) {
-          LOG.error(ex.getMessage, ex)
+        LOG.error(ex.getMessage, ex)
       }
       self ! CompareAndReconnectIfEqual(channel)
     }
@@ -175,18 +183,17 @@ class Client(conf: NettyConfig, factory: ChannelFactory, hostPort : HostPort) ex
   private def isChannelWritable = (null != channel) && channel.isWritable
 }
 
-
 object Client {
   val LOG: Logger = LogUtil.getLogger(getClass)
 
-  //Reconnect if current channel equals channel
+  // Reconnect if current channel equals channel
   case class CompareAndReconnectIfEqual(channel: Channel)
 
   case class Connect(tries: Int)
-  case class ChannelReady(chanel : Channel)
+  case class ChannelReady(chanel: Channel)
   case object Close
 
-  case class Flush(channel : Channel)
+  case class Flush(channel: Channel)
 
   class ClientErrorHandler(name: String) extends SimpleChannelUpstreamHandler {
 
@@ -210,7 +217,9 @@ object Client {
     }
   }
 
-  implicit def channelFutureToChannelFutureOps(channel: ChannelFuture): ChannelFutureOps = new ChannelFutureOps(channel)
+  implicit def channelFutureToChannelFutureOps(channel: ChannelFuture): ChannelFutureOps = {
+    new ChannelFutureOps(channel)
+  }
 
   class ChannelFutureOps(channelFuture: ChannelFuture) {