You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/04/26 11:42:34 UTC

[24/49] incubator-gearpump git commit: GEARPUMP-11, fix code style

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/source/DataSource.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/source/DataSource.scala b/streaming/src/main/scala/io/gearpump/streaming/source/DataSource.scala
index bee81fb..9cb7ca0 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/source/DataSource.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/source/DataSource.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,13 +19,13 @@
 package io.gearpump.streaming.source
 
 import io.gearpump.streaming.task.TaskContext
-import io.gearpump.{TimeStamp, Message}
+import io.gearpump.{Message, TimeStamp}
 
 /**
- * interface to implement custom source where data is read into the system.
+ * Interface to implement custom source where data is read into the system.
  * a DataSource could be a message queue like kafka or simply data generation source.
  *
- * an example would be like
+ * An example would be like
  * {{{
  *  GenStringSource extends DataSource {
  *
@@ -44,7 +44,7 @@ import io.gearpump.{TimeStamp, Message}
 trait DataSource extends java.io.Serializable {
 
   /**
-   * open connection to data source
+   * Opens connection to data source
    * invoked in onStart() method of [[io.gearpump.streaming.source.DataSourceTask]]
    * @param context is the task context at runtime
    * @param startTime is the start time of system
@@ -52,7 +52,7 @@ trait DataSource extends java.io.Serializable {
   def open(context: TaskContext, startTime: Option[TimeStamp]): Unit
 
   /**
-   * read a number of messages from data source.
+   * Reads a number of messages from data source.
    * invoked in each onNext() method of [[io.gearpump.streaming.source.DataSourceTask]]
    * @param batchSize max number of messages to read
    * @return a list of messages wrapped in [[io.gearpump.Message]]
@@ -60,7 +60,7 @@ trait DataSource extends java.io.Serializable {
   def read(batchSize: Int): List[Message]
 
   /**
-   * close connection to data source.
+   * Closes connection to data source.
    * invoked in onStop() method of [[io.gearpump.streaming.source.DataSourceTask]]
    */
   def close(): Unit

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceConfig.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceConfig.scala b/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceConfig.scala
index e7c0599..6ca939f 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceConfig.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceConfig.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -22,5 +22,4 @@ object DataSourceConfig {
 
   val SOURCE_READ_BATCH_SIZE = "gearpump.source.read.batch.size"
   val SOURCE_TIMESTAMP_FILTER = "gearpump.source.timestamp.filter.class"
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceProcessor.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceProcessor.scala b/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceProcessor.scala
index 61eabcc..384b86a 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceProcessor.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceProcessor.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,14 +19,15 @@
 package io.gearpump.streaming.source
 
 import akka.actor.ActorSystem
-import io.gearpump.streaming.Processor
+
 import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.Processor
 
 /**
- * utility that helps user to create a DAG starting with [[DataSourceTask]]
+ * Utility that helps user to create a DAG starting with [[DataSourceTask]]
  * user should pass in a [[DataSource]]
  *
- * here is an example to build a DAG that reads from Kafka source followed by word count
+ * Here is an example to build a DAG that reads from Kafka source followed by word count
  * {{{
  *    val source = new KafkaSource()
  *    val sourceProcessor =  DataSourceProcessor(source, 1)
@@ -36,10 +37,12 @@ import io.gearpump.cluster.UserConfig
  * }}}
  */
 object DataSourceProcessor {
-  def apply(dataSource: DataSource,
-            parallelism: Int,
-            description: String = "",
-            taskConf: UserConfig = UserConfig.empty)(implicit system: ActorSystem): Processor[DataSourceTask] = {
+  def apply(
+      dataSource: DataSource,
+      parallelism: Int,
+      description: String = "",
+      taskConf: UserConfig = UserConfig.empty)(implicit system: ActorSystem)
+    : Processor[DataSourceTask] = {
     Processor[DataSourceTask](parallelism, description = description,
       taskConf.withValue[DataSource](DataSourceTask.DATA_SOURCE, dataSource))
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceTask.scala
index 4c65100..d9b2110 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceTask.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceTask.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,26 +18,27 @@
 
 package io.gearpump.streaming.source
 
-import io.gearpump.streaming.task.{Task, StartTime, TaskContext}
 import io.gearpump._
 import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
 
 object DataSourceTask {
   val DATA_SOURCE = "data_source"
 }
 
 /**
- * general task that runs any [[DataSource]]
- * see [[DataSourceProcessor]] for its usage
+ * Task container for [[io.gearpump.streaming.source.DataSource]].
+ * See [[io.gearpump.streaming.source.DataSourceProcessor]] for its usage
  *
- * DataSourceTask calls
- *   - `DataSource.open` in `onStart` and pass in [[io.gearpump.streaming.task.TaskContext]] and application start time
- *   - `DataSource.read` in each `onNext`, which reads a batch of messages whose size are defined by
- *     `gearpump.source.read.batch.size`.
- *   - `DataSource.close` in `onStop`
+ * DataSourceTask calls:
+ *  - `DataSource.open()` in `onStart` and pass in [[io.gearpump.streaming.task.TaskContext]]
+ * and application start time
+ *  - `DataSource.read()` in each `onNext`, which reads a batch of messages whose size are
+ * defined by `gearpump.source.read.batch.size`.
+ *  - `DataSource.close()` in `onStop`
  */
 class DataSourceTask(context: TaskContext, conf: UserConfig) extends Task(context, conf) {
-  import DataSourceTask._
+  import io.gearpump.streaming.source.DataSourceTask._
 
   private val source = conf.getValue[DataSource](DATA_SOURCE).get
   private val batchSize = conf.getInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE).getOrElse(1000)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/source/DefaultTimeStampFilter.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/source/DefaultTimeStampFilter.scala b/streaming/src/main/scala/io/gearpump/streaming/source/DefaultTimeStampFilter.scala
index a9ee674..b7d4e90 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/source/DefaultTimeStampFilter.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/source/DefaultTimeStampFilter.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,10 +19,10 @@
 package io.gearpump.streaming.source
 
 import io.gearpump.streaming.transaction.api.TimeStampFilter
-import io.gearpump.{TimeStamp, Message}
+import io.gearpump.{Message, TimeStamp}
 
 /**
- * default TimeStampFilter that filters out messages with smaller timestamps
+ * TimeStampFilter filters out messages which have obsolete (smaller) timestamp.
  */
 class DefaultTimeStampFilter extends TimeStampFilter {
   override def filter(msg: Message, predicate: TimeStamp): Option[Message] = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/state/api/Monoid.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/api/Monoid.scala b/streaming/src/main/scala/io/gearpump/streaming/state/api/Monoid.scala
index a25e20e..dfe3e93 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/state/api/Monoid.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/state/api/Monoid.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,10 +19,10 @@
 package io.gearpump.streaming.state.api
 
 trait Monoid[T] extends java.io.Serializable {
-   def plus(l: T, r: T): T
-   def zero: T
+  def plus(l: T, r: T): T
+  def zero: T
 }
 
 trait Group[T] extends Monoid[T] {
-   def minus(l: T, r: T): T
- }
\ No newline at end of file
+  def minus(l: T, r: T): T
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/state/api/MonoidState.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/api/MonoidState.scala b/streaming/src/main/scala/io/gearpump/streaming/state/api/MonoidState.scala
index dadbba6..238eab4 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/state/api/MonoidState.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/state/api/MonoidState.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -28,9 +28,9 @@ import io.gearpump.TimeStamp
  * the incoming value using monoid.plus to get a new state value
  */
 abstract class MonoidState[T](monoid: Monoid[T]) extends PersistentState[T] {
-  // left state updated by messages before checkpoint time
+  // Left state updated by messages before checkpoint time
   private[state] var left: T = monoid.zero
-  // right state updated by message after checkpoint time
+  // Right state updated by message after checkpoint time
   private[state] var right: T = monoid.zero
 
   protected var checkpointTime = Long.MaxValue

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentState.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentState.scala b/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentState.scala
index 6c595da..f1b923a 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentState.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentState.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -30,33 +30,32 @@ import io.gearpump._
 trait PersistentState[T] {
 
   /**
-   * recover state to a previous checkpoint
+   * Recovers state to a previous checkpoint
    * usually invoked by the framework
    */
   def recover(timestamp: TimeStamp, bytes: Array[Byte]): Unit
 
   /**
-   * update state on a new message
+   * Updates state on a new message
    * this is invoked by user
    */
   def update(timestamp: TimeStamp, t: T): Unit
 
   /**
-   * set next checkpoint time
+   * Sets next checkpoint time
    * should be invoked by the framework
    */
   def setNextCheckpointTime(timeStamp: TimeStamp): Unit
 
   /**
-   * get a binary snapshot of state
+   * Gets a binary snapshot of state
    * usually invoked by the framework
    */
   def checkpoint(): Array[Byte]
 
   /**
-   * unwrap the raw value of state
+   * Unwraps the raw value of state
    */
   def get: Option[T]
 }
 
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentTask.scala b/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentTask.scala
index fbf507f..a8a5d5d 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentTask.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentTask.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,16 +19,15 @@
 package io.gearpump.streaming.state.api
 
 import java.util.concurrent.TimeUnit
+import scala.concurrent.duration.FiniteDuration
 
 import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.state.impl.{PersistentStateConfig, CheckpointManager}
+import io.gearpump.streaming.state.impl.{CheckpointManager, PersistentStateConfig}
 import io.gearpump.streaming.task.{ReportCheckpointClock, StartTime, Task, TaskContext}
 import io.gearpump.streaming.transaction.api.CheckpointStoreFactory
 import io.gearpump.util.LogUtil
 import io.gearpump.{Message, TimeStamp}
 
-import scala.concurrent.duration.FiniteDuration
-
 object PersistentTask {
   val CHECKPOINT = Message("checkpoint")
   val LOG = LogUtil.getLogger(getClass)
@@ -42,35 +41,32 @@ object PersistentTask {
  */
 abstract class PersistentTask[T](taskContext: TaskContext, conf: UserConfig)
   extends Task(taskContext, conf) {
-  import io.gearpump.streaming.state.api.PersistentTask._
   import taskContext._
 
-  val checkpointStoreFactory = conf.getValue[CheckpointStoreFactory](PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY).get
+  import io.gearpump.streaming.state.api.PersistentTask._
+
+  val checkpointStoreFactory = conf.getValue[CheckpointStoreFactory](
+    PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY).get
   val checkpointStore = checkpointStoreFactory.getCheckpointStore(conf, taskContext)
   val checkpointInterval = conf.getLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS).get
   val checkpointManager = new CheckpointManager(checkpointInterval, checkpointStore)
-  // system time interval to attempt checkpoint
+  // System time interval to attempt checkpoint
   private val checkpointAttemptInterval = 1000L
 
   /**
-   * subclass should override this method to pass in
-   * a PersistentState
-   *
-   * the framework has already offered two states
-   *
-   *   - NonWindowState
-   *     state with no time or other boundary
-   *   - WindowState
-   *     each state is bounded by a time window
+   * Subclass should override this method to pass in a PersistentState. the framework has already
+   * offered two states:
+   *  - NonWindowState: state with no time or other boundary
+   *  - WindowState:  each state is bounded by a time window
    */
   def persistentState: PersistentState[T]
 
   /**
-   * subclass should override this method to specify how a
-   * new message should update state
+   * Subclass should override this method to specify how a new message should update state
    */
   def processMessage(state: PersistentState[T], message: Message): Unit
 
+  /** Persistent state that will be stored (by checkpointing) automatically to storage like HDFS */
   val state = persistentState
 
   final override def onStart(startTime: StartTime): Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/state/api/Serializer.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/api/Serializer.scala b/streaming/src/main/scala/io/gearpump/streaming/state/api/Serializer.scala
index 4b8e2f5..f87b224 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/state/api/Serializer.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/state/api/Serializer.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,6 +21,6 @@ package io.gearpump.streaming.state.api
 import scala.util.Try
 
 trait Serializer[T] extends java.io.Serializable {
-   def serialize(t: T): Array[Byte]
-   def deserialize(bytes: Array[Byte]): Try[T]
- }
+  def serialize(t: T): Array[Byte]
+  def deserialize(bytes: Array[Byte]): Try[T]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/state/impl/CheckpointManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/impl/CheckpointManager.scala b/streaming/src/main/scala/io/gearpump/streaming/state/impl/CheckpointManager.scala
index 5fcbbcb..76c91c7 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/state/impl/CheckpointManager.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/state/impl/CheckpointManager.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,6 +21,7 @@ package io.gearpump.streaming.state.impl
 import io.gearpump.TimeStamp
 import io.gearpump.streaming.transaction.api.CheckpointStore
 
+/** Manage physical checkpoints to persitent storage like HDFS */
 class CheckpointManager(checkpointInterval: Long,
     checkpointStore: CheckpointStore) {
 
@@ -34,7 +35,7 @@ class CheckpointManager(checkpointInterval: Long,
   def checkpoint(timestamp: TimeStamp, checkpoint: Array[Byte]): Option[TimeStamp] = {
     checkpointStore.persist(timestamp, checkpoint)
     checkpointTime = checkpointTime.collect { case time if maxMessageTime > time =>
-     time + (1 + (maxMessageTime - time) / checkpointInterval) * checkpointInterval
+      time + (1 + (maxMessageTime - time) / checkpointInterval) * checkpointInterval
     }
 
     checkpointTime

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala b/streaming/src/main/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala
index 164b932..21623e3 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,7 +21,7 @@ package io.gearpump.streaming.state.impl
 import io.gearpump.TimeStamp
 import io.gearpump.cluster.UserConfig
 import io.gearpump.streaming.task.TaskContext
-import io.gearpump.streaming.transaction.api.{CheckpointStoreFactory, CheckpointStore}
+import io.gearpump.streaming.transaction.api.{CheckpointStore, CheckpointStoreFactory}
 
 /**
  * an in memory store provided for test
@@ -41,7 +41,6 @@ class InMemoryCheckpointStore extends CheckpointStore {
   override def close(): Unit = {
     checkpoints = Map.empty[TimeStamp, Array[Byte]]
   }
-
 }
 
 class InMemoryCheckpointStoreFactory extends CheckpointStoreFactory {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/state/impl/NonWindowState.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/impl/NonWindowState.scala b/streaming/src/main/scala/io/gearpump/streaming/state/impl/NonWindowState.scala
index aefd7e1..dcd3918 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/state/impl/NonWindowState.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/state/impl/NonWindowState.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,20 +18,22 @@
 
 package io.gearpump.streaming.state.impl
 
+import org.slf4j.Logger
+
 import io.gearpump.TimeStamp
 import io.gearpump.streaming.state.api.{Monoid, MonoidState, Serializer}
-import io.gearpump.util.LogUtil
 import io.gearpump.streaming.state.impl.NonWindowState._
-import org.slf4j.Logger
+import io.gearpump.util.LogUtil
 
 object NonWindowState {
   val LOG: Logger = LogUtil.getLogger(classOf[NonWindowState[_]])
 }
 
 /**
-  * a MonoidState storing non-window state
+ * a MonoidState storing non-window state
  */
-class NonWindowState[T](monoid: Monoid[T], serializer: Serializer[T]) extends MonoidState[T](monoid) {
+class NonWindowState[T](monoid: Monoid[T], serializer: Serializer[T])
+  extends MonoidState[T](monoid) {
 
   override def recover(timestamp: TimeStamp, bytes: Array[Byte]): Unit = {
     serializer.deserialize(bytes).foreach(left = _)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/state/impl/PersistentStateConfig.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/impl/PersistentStateConfig.scala b/streaming/src/main/scala/io/gearpump/streaming/state/impl/PersistentStateConfig.scala
index ecbb092..d7488d7 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/state/impl/PersistentStateConfig.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/state/impl/PersistentStateConfig.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/state/impl/Window.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/impl/Window.scala b/streaming/src/main/scala/io/gearpump/streaming/state/impl/Window.scala
index 13f0eef..63cdf06 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/state/impl/Window.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/state/impl/Window.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,7 +20,7 @@ package io.gearpump.streaming.state.impl
 import io.gearpump.TimeStamp
 
 /**
- * used in window applications
+ * Used in window applications
  * it keeps the current window and slide ahead when the window expires
  */
 class Window(val windowSize: Long, val windowStep: Long) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowConfig.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowConfig.scala b/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowConfig.scala
index 41395f6..d7d3776 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowConfig.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowConfig.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowState.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowState.scala b/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowState.scala
index 007c9a4..30382c0 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowState.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowState.scala
@@ -2,12 +2,12 @@
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
- * regarding cstateyright ownership.  The ASF licenses this file
+ * regarding copyright ownership.  The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a cstatey of the License at
+ * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,14 +18,15 @@
 
 package io.gearpump.streaming.state.impl
 
+import scala.collection.immutable.TreeMap
+
+import org.slf4j.Logger
+
 import io.gearpump.TimeStamp
-import io.gearpump.streaming.state.api.{Group, Serializer, MonoidState}
-import io.gearpump.streaming.task.TaskContext
+import io.gearpump.streaming.state.api.{Group, MonoidState, Serializer}
 import io.gearpump.streaming.state.impl.WindowState._
+import io.gearpump.streaming.task.TaskContext
 import io.gearpump.util.LogUtil
-import org.slf4j.Logger
-
-import scala.collection.immutable.TreeMap
 
 /**
  * an interval is a dynamic time range that is divided by window boundary and checkpoint time
@@ -51,10 +52,9 @@ object WindowState {
  * possible to undo the update by messages that have left the window
  */
 class WindowState[T](group: Group[T],
-                     serializer: Serializer[TreeMap[Interval, T]],
-                     taskContext: TaskContext,
-                     window: Window)
-  extends MonoidState[T](group) {
+    serializer: Serializer[TreeMap[Interval, T]],
+    taskContext: TaskContext,
+    window: Window) extends MonoidState[T](group) {
   /**
    * each interval has a state updated by message with timestamp in
    * [interval.startTime, interval.endTime)
@@ -67,11 +67,11 @@ class WindowState[T](group: Group[T],
     window.slideTo(timestamp)
     serializer.deserialize(bytes)
       .foreach { states =>
-      intervalStates = states
-      left = states.foldLeft(left) { case (accum, iter) =>
-        group.plus(accum, iter._2)
+        intervalStates = states
+        left = states.foldLeft(left) { case (accum, iter) =>
+          group.plus(accum, iter._2)
+        }
       }
-    }
   }
 
   override def update(timestamp: TimeStamp, t: T): Unit = {
@@ -115,14 +115,17 @@ class WindowState[T](group: Group[T],
   }
 
   /**
-   * each message will update state in corresponding Interval[StartTime, endTime),
+   * Each message will update state in corresponding Interval[StartTime, endTime),
+   *
    * which is decided by the message's timestamp t where
-   *    startTime = Math.max(lowerBound1, lowerBound2, checkpointTime)
-   *    endTime = Math.min(upperBound1, upperBound2, checkpointTime)
-   *    lowerBound1 = step * Nmax1 <= t
-   *    lowerBound2 = step * Nmax2 + size <= t
-   *    upperBound1 = step * Nmin1 > t
-   *    upperBound2 = step * Nmin2 + size > t
+   * {{{
+   * startTime = Math.max(lowerBound1, lowerBound2, checkpointTime)
+   * endTime = Math.min(upperBound1, upperBound2, checkpointTime)
+   * lowerBound1 = step * Nmax1 <= t
+   * lowerBound2 = step * Nmax2 + size <= t
+   * upperBound1 = step * Nmin1 > t
+   * upperBound2 = step * Nmin2 + size > t
+   * }}}
    */
   private[impl] def getInterval(timestamp: TimeStamp, checkpointTime: TimeStamp): Interval = {
     val windowSize = window.windowSize
@@ -144,7 +147,8 @@ class WindowState[T](group: Group[T],
     }
   }
 
-  private[impl] def updateIntervalStates(timestamp: TimeStamp, t: T, checkpointTime: TimeStamp): Unit = {
+  private[impl] def updateIntervalStates(timestamp: TimeStamp, t: T, checkpointTime: TimeStamp)
+  : Unit = {
     val interval = getInterval(timestamp, checkpointTime)
     intervalStates.get(interval) match {
       case Some(st) =>
@@ -154,8 +158,8 @@ class WindowState[T](group: Group[T],
     }
   }
 
-  private[impl] def getIntervalStates(startTime: TimeStamp, endTime: TimeStamp): TreeMap[Interval, T] = {
+  private[impl] def getIntervalStates(startTime: TimeStamp, endTime: TimeStamp)
+  : TreeMap[Interval, T] = {
     intervalStates.dropWhile(_._1.endTime <= startTime).takeWhile(_._1.endTime <= endTime)
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/storage/AppDataStore.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/storage/AppDataStore.scala b/streaming/src/main/scala/io/gearpump/streaming/storage/AppDataStore.scala
index dcdc2ff..962f48f 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/storage/AppDataStore.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/storage/AppDataStore.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,12 +21,10 @@ import scala.concurrent._
 
 /**
  * Generic storage to store KV Data.
- *
  */
 trait AppDataStore {
   def put(key: String, value: Any): Future[Any]
 
-  def get(key: String) : Future[Any]
+  def get(key: String): Future[Any]
 }
 
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMaster.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMaster.scala b/streaming/src/main/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMaster.scala
index 13e3dae..f1a19a8 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMaster.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMaster.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,12 +17,13 @@
  */
 package io.gearpump.streaming.storage
 
+import scala.concurrent.Future
+
 import akka.actor.ActorRef
 import akka.pattern.ask
-import io.gearpump.cluster.AppMasterToMaster.{GetAppDataResult, GetAppData, SaveAppData}
-import io.gearpump.util.Constants
 
-import scala.concurrent.Future
+import io.gearpump.cluster.AppMasterToMaster.{GetAppData, GetAppDataResult, SaveAppData}
+import io.gearpump.util.Constants
 
 /**
  * In memory application storage located on master nodes
@@ -36,8 +37,8 @@ class InMemoryAppStoreOnMaster(appId: Int, master: ActorRef) extends AppDataStor
   }
 
   override def get(key: String): Future[Any] = {
-    master.ask(GetAppData(appId, key)).asInstanceOf[Future[GetAppDataResult]].map{result =>
-      if(result.key.equals(key)) {
+    master.ask(GetAppData(appId, key)).asInstanceOf[Future[GetAppDataResult]].map { result =>
+      if (result.key.equals(key)) {
         result.value
       } else {
         null

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/ExpressTransport.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/ExpressTransport.scala b/streaming/src/main/scala/io/gearpump/streaming/task/ExpressTransport.scala
index 2dbdd14..a3cb6e1 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/task/ExpressTransport.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/task/ExpressTransport.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,19 +19,16 @@
 package io.gearpump.streaming.task
 
 import akka.actor.{ActorRef, ExtendedActorSystem}
-import io.gearpump.serializer.SerializationFramework
+
+import io.gearpump.Message
 import io.gearpump.transport.netty.TaskMessage
 import io.gearpump.transport.{Express, HostPort}
-import io.gearpump.Message
-
-import scala.collection.mutable
 import io.gearpump.util.AkkaHelper
 /**
  * ExpressTransport wire the networking function from default akka
- * networking to customized implementation [[Express]].
- *
- * See [[Express]] for more information.
+ * networking to customized implementation [[io.gearpump.transport.Express]].
  *
+ * See [[io.gearpump.transport.Express]] for more information.
  */
 trait ExpressTransport {
   this: TaskActor =>
@@ -39,15 +36,15 @@ trait ExpressTransport {
   final val express = Express(context.system)
   implicit val system = context.system.asInstanceOf[ExtendedActorSystem]
 
-  final def local = express.localHost
+  final def local: HostPort = express.localHost
   lazy val sourceId = TaskId.toLong(taskId)
 
   lazy val sessionRef: ActorRef = {
     AkkaHelper.actorFor(system, s"/session#$sessionId")
   }
 
-  def transport(msg : AnyRef, remotes : TaskId *): Unit = {
-    var serializedMessage : AnyRef = null
+  def transport(msg: AnyRef, remotes: TaskId*): Unit = {
+    var serializedMessage: AnyRef = null
 
     remotes.foreach { remote =>
       val transportId = TaskId.toLong(remote)
@@ -69,7 +66,8 @@ trait ExpressTransport {
         if (remoteAddress.isDefined) {
           express.transport(taskMessage, remoteAddress.get)
         } else {
-          LOG.error(s"Can not find target task $remote, maybe the application is undergoing recovery")
+          LOG.error(
+            s"Can not find target task $remote, maybe the application is undergoing recovery")
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/SerializedMessage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/SerializedMessage.scala b/streaming/src/main/scala/io/gearpump/streaming/task/SerializedMessage.scala
index ff2ca89..9f9bf1b 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/task/SerializedMessage.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/task/SerializedMessage.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/SerializerResolver.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/SerializerResolver.scala b/streaming/src/main/scala/io/gearpump/streaming/task/SerializerResolver.scala
index edac7f0..72bc7db 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/task/SerializerResolver.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/task/SerializerResolver.scala
@@ -15,19 +15,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package io.gearpump.streaming.task
 
-import io.gearpump.esotericsoftware.kryo.util.ObjectMap
-import io.gearpump.esotericsoftware.kryo.util.IntMap
+import io.gearpump.esotericsoftware.kryo.util.{IntMap, ObjectMap}
 import io.gearpump.streaming.task.SerializerResolver.Registration
 
 private[task] class SerializerResolver {
   private var classId = 0
-  val idToRegistration = new IntMap[Registration]()
-  val classToRegistration = new ObjectMap[Class[_], Registration]()
+  private val idToRegistration = new IntMap[Registration]()
+  private val classToRegistration = new ObjectMap[Class[_], Registration]()
 
-  def register(clazz: Class[_], serializer: TaskMessageSerializer[_]): Unit = {
-    val registration = Registration(classId, clazz, serializer)
+  def register[T](clazz: Class[T], serializer: TaskMessageSerializer[T]): Unit = {
+    val registration = new Registration(classId, clazz, serializer)
     idToRegistration.put(classId, registration)
     classToRegistration.put(clazz, registration)
     classId += 1
@@ -42,6 +42,6 @@ private[task] class SerializerResolver {
   }
 }
 
-object SerializerResolver{
-  case class Registration(id: Int, clazz: Class[_], serializer: TaskMessageSerializer[_])
+object SerializerResolver {
+  class Registration(val id: Int, val clazz: Class[_], val serializer: TaskMessageSerializer[_])
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/StartTime.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/StartTime.scala b/streaming/src/main/scala/io/gearpump/streaming/task/StartTime.scala
index 75e3521..6bc8b15 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/task/StartTime.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/task/StartTime.scala
@@ -1,5 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package io.gearpump.streaming.task
 
 import io.gearpump.TimeStamp
 
-case class StartTime(startTime : TimeStamp = 0)
+/** Start time of streaming application. All message older than start time will be dropped */
+case class StartTime(startTime: TimeStamp = 0)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/StreamingTransportSerializer.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/StreamingTransportSerializer.scala b/streaming/src/main/scala/io/gearpump/streaming/task/StreamingTransportSerializer.scala
index 52b8021..17d0b1b 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/task/StreamingTransportSerializer.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/task/StreamingTransportSerializer.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,12 +19,13 @@ package io.gearpump.streaming.task
 
 import java.io.{DataInput, DataOutput}
 
-import io.gearpump.streaming.{LatencyProbeSerializer, InitialAckRequestSerializer, AckRequestSerializer, AckSerializer}
+import org.slf4j.Logger
+
+import io.gearpump.streaming.{AckRequestSerializer, AckSerializer, InitialAckRequestSerializer, LatencyProbeSerializer}
 import io.gearpump.transport.netty.ITransportMessageSerializer
 import io.gearpump.util.LogUtil
-import org.slf4j.Logger
 
-class StreamingTransportSerializer extends ITransportMessageSerializer{
+class StreamingTransportSerializer extends ITransportMessageSerializer {
   private val log: Logger = LogUtil.getLogger(getClass)
   private val serializers = new SerializerResolver
 
@@ -36,7 +37,7 @@ class StreamingTransportSerializer extends ITransportMessageSerializer{
 
   override def serialize(dataOutput: DataOutput, obj: Object): Unit = {
     val registration = serializers.getRegistration(obj.getClass)
-    if(registration != null) {
+    if (registration != null) {
       dataOutput.writeInt(registration.id)
       registration.serializer.asInstanceOf[TaskMessageSerializer[AnyRef]].write(dataOutput, obj)
     } else {
@@ -47,7 +48,7 @@ class StreamingTransportSerializer extends ITransportMessageSerializer{
   override def deserialize(dataInput: DataInput, length: Int): Object = {
     val classID = dataInput.readInt()
     val registration = serializers.getRegistration(classID)
-    if(registration != null) {
+    if (registration != null) {
       registration.serializer.asInstanceOf[TaskMessageSerializer[AnyRef]].read(dataInput)
     } else {
       log.error(s"Can not find serializer for class id $classID")
@@ -57,7 +58,7 @@ class StreamingTransportSerializer extends ITransportMessageSerializer{
 
   override def getLength(obj: Object): Int = {
     val registration = serializers.getRegistration(obj.getClass)
-    if(registration != null) {
+    if (registration != null) {
       registration.serializer.asInstanceOf[TaskMessageSerializer[AnyRef]].getLength(obj) + 4
     } else {
       log.error(s"Can not find serializer for class type ${obj.getClass}")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/Subscriber.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/Subscriber.scala b/streaming/src/main/scala/io/gearpump/streaming/task/Subscriber.scala
index 72d46f7..d20074b 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/task/Subscriber.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/task/Subscriber.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,7 +19,7 @@
 package io.gearpump.streaming.task
 
 import io.gearpump.partitioner.PartitionerDescription
-import io.gearpump.streaming.{LifeTime, DAG}
+import io.gearpump.streaming.{DAG, LifeTime}
 
 /**
  * Each processor can have multiple downstream subscribers.
@@ -30,8 +30,8 @@ import io.gearpump.streaming.{LifeTime, DAG}
  * @param processorId subscriber processor Id
  * @param partitionerDescription subscriber partitioner
  */
-
-case class Subscriber(processorId: Int, partitionerDescription: PartitionerDescription, parallelism: Int, lifeTime: LifeTime)
+case class Subscriber(processorId: Int, partitionerDescription: PartitionerDescription,
+    parallelism: Int, lifeTime: LifeTime)
 
 object Subscriber {
 
@@ -50,7 +50,8 @@ object Subscriber {
     edges.foldLeft(List.empty[Subscriber]) { (list, nodeEdgeNode) =>
       val (_, partitioner, downstreamProcessorId) = nodeEdgeNode
       val downstreamProcessor = dag.processors(downstreamProcessorId)
-      list :+ Subscriber(downstreamProcessorId, partitioner, downstreamProcessor.parallelism, downstreamProcessor.life)
+      list :+ Subscriber(downstreamProcessorId, partitioner,
+        downstreamProcessor.parallelism, downstreamProcessor.life)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala b/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala
index 0b1fa29..155edf4 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,20 +18,22 @@
 
 package io.gearpump.streaming.task
 
+import org.slf4j.Logger
+
 import io.gearpump.google.common.primitives.Shorts
-import io.gearpump.partitioner.{Partitioner, MulticastPartitioner, UnicastPartitioner}
+import io.gearpump.partitioner.{MulticastPartitioner, Partitioner, UnicastPartitioner}
 import io.gearpump.streaming.AppMasterToExecutor.MsgLostException
 import io.gearpump.streaming.LifeTime
 import io.gearpump.streaming.task.Subscription._
 import io.gearpump.util.LogUtil
 import io.gearpump.{Message, TimeStamp}
-import org.slf4j.Logger
 
 /**
- * This manage the output and message clock for single downstream processor
+ * Manges the output and message clock for single downstream processor
  *
  * @param subscriber downstream processor
- * @param maxPendingMessageCount trigger flow control. Should be bigger than maxPendingMessageCountPerAckRequest
+ * @param maxPendingMessageCount trigger flow control. Should be bigger than
+ *                               maxPendingMessageCountPerAckRequest
  * @param ackOnceEveryMessageCount send on AckRequest to the target
  */
 class Subscription(
@@ -44,9 +46,10 @@ class Subscription(
     ackOnceEveryMessageCount: Int = ONE_ACKREQUEST_EVERY_MESSAGE_COUNT) {
 
   assert(maxPendingMessageCount >= ackOnceEveryMessageCount)
-  assert(maxPendingMessageCount  < Short.MaxValue / 2)
+  assert(maxPendingMessageCount < Short.MaxValue / 2)
 
-  val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = executorId, task = taskId)
+  private val LOG: Logger = LogUtil.getLogger(getClass, app = appId,
+    executor = executorId, task = taskId)
 
   import subscriber.{parallelism, partitionerDescription, processorId}
 
@@ -62,8 +65,8 @@ class Subscription(
 
   private var life = subscriber.lifeTime
 
-  val partitioner = partitionerDescription.partitionerFactory.partitioner
-  val sendFn = partitioner match {
+  private val partitioner = partitionerDescription.partitionerFactory.partitioner
+  private val sendFn = partitioner match {
     case up: UnicastPartitioner =>
       (msg: Message) => {
         val partition = up.getPartition(msg, parallelism, taskId.index)
@@ -74,14 +77,13 @@ class Subscription(
         val partitions = mp.getPartitions(msg, parallelism, taskId.index)
         partitions.map(partition => sendMessage(msg, partition)).sum
       }
-
   }
 
   def changeLife(life: LifeTime): Unit = {
     this.life = life
   }
 
-  def start: Unit = {
+  def start(): Unit = {
     val ackRequest = InitialAckRequest(taskId, sessionId)
     transport.transport(ackRequest, allTasks: _*)
   }
@@ -91,14 +93,16 @@ class Subscription(
   }
 
   /**
-   * Return how many message is actually sent by this subscription
+   * Returns how many message is actually sent by this subscription
+   *
    * @param msg  the message to send
    * @param partition  the target partition to send message to
    * @return 1 if success
    */
   def sendMessage(msg: Message, partition: Int): Int = {
 
-    // only send message whose timestamp matches the lifeTime
+    var count = 0
+    // Only sends message whose timestamp matches the lifeTime
     if (partition != Partitioner.UNKNOWN_PARTITION_ID && life.contains(msg.timestamp)) {
 
       val targetTask = TaskId(processorId, partition)
@@ -117,24 +121,25 @@ class Subscription(
         (messageCount(partition) + ackOnceEveryMessageCount) / maxPendingMessageCount) {
         sendLatencyProbe(partition)
       }
-
-      return 1
+      count = 1
+      count
     } else {
       if (needFlush) {
-        flush
+        flush()
       }
-
-      return 0
+      count = 0
+      count
     }
   }
 
   private var lastFlushTime: Long = 0L
   private val FLUSH_INTERVAL = 5 * 1000 // ms
   private def needFlush: Boolean = {
-    System.currentTimeMillis() - lastFlushTime > FLUSH_INTERVAL && Shorts.max(pendingMessageCount: _*) > 0
+    System.currentTimeMillis() - lastFlushTime > FLUSH_INTERVAL &&
+      Shorts.max(pendingMessageCount: _*) > 0
   }
 
-  private def flush: Unit = {
+  private def flush(): Unit = {
     lastFlushTime = System.currentTimeMillis()
     allTasks.foreach { targetTaskId =>
       sendAckRequest(targetTaskId.index)
@@ -142,13 +147,14 @@ class Subscription(
   }
 
   private def allTasks: scala.collection.Seq[TaskId] = {
-    (0 until parallelism).map {taskIndex =>
+    (0 until parallelism).map { taskIndex =>
       TaskId(processorId, taskIndex)
     }
   }
 
-  /** Handle acknowledge message.
-    * Throw MessageLossException if required.
+  /**
+   * Handles acknowledge message. Throw MessageLossException if required.
+   *
    * @param ack acknowledge message received
    */
   def receiveAck(ack: Ack): Unit = {
@@ -159,7 +165,7 @@ class Subscription(
       if (ack.actualReceivedNum == ack.seq) {
         if ((ack.seq - candidateMinClockSince(index)).toShort >= 0) {
           if (ack.seq == messageCount(index)) {
-            // all messages have been acked.
+            // All messages have been acked.
             minClockValue(index) = Long.MaxValue
           } else {
             minClockValue(index) = candidateMinClock(index)
@@ -171,7 +177,8 @@ class Subscription(
         pendingMessageCount(ack.taskId.index) = (messageCount(ack.taskId.index) - ack.seq).toShort
         updateMaxPendingCount()
       } else {
-        LOG.error(s"Failed! received ack: $ack, received: ${ack.actualReceivedNum}, sent: ${ack.seq}, try to replay...")
+        LOG.error(s"Failed! received ack: $ack, received: ${ack.actualReceivedNum}, " +
+          s"sent: ${ack.seq}, try to replay...")
         throw new MsgLostException
       }
     }
@@ -181,13 +188,14 @@ class Subscription(
     minClockValue.min
   }
 
-  def allowSendingMoreMessages() : Boolean = {
+  def allowSendingMoreMessages(): Boolean = {
     maxPendingCount < maxPendingMessageCount
   }
 
   def sendAckRequestOnStallingTime(stallingTime: TimeStamp): Unit = {
     minClockValue.indices.foreach { i =>
-      if (minClockValue(i) == stallingTime && pendingMessageCount(i) > 0 && allowSendingMoreMessages) {
+      if (minClockValue(i) == stallingTime && pendingMessageCount(i) > 0
+        && allowSendingMoreMessages) {
         sendAckRequest(i)
         sendLatencyProbe(i)
       }
@@ -195,7 +203,7 @@ class Subscription(
   }
 
   private def sendAckRequest(partition: Int): Unit = {
-    // we increment more count for each AckRequest
+    // Increments more count for each AckRequest
     // to throttle the number of unacked AckRequest
     incrementMessageCount(partition, ackOnceEveryMessageCount)
     val targetTask = TaskId(processorId, partition)
@@ -218,11 +226,10 @@ class Subscription(
     val targetTask = TaskId(processorId, partition)
     transport.transport(probeLatency, targetTask)
   }
-
 }
 
 object Subscription {
-  //make sure it is smaller than MAX_PENDING_MESSAGE_COUNT
+  // Makes sure it is smaller than MAX_PENDING_MESSAGE_COUNT
   final val ONE_ACKREQUEST_EVERY_MESSAGE_COUNT = 100
   final val MAX_PENDING_MESSAGE_COUNT = 1000
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/Task.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/Task.scala b/streaming/src/main/scala/io/gearpump/streaming/task/Task.scala
index a176ca7..212a659 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/task/Task.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/task/Task.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,14 +18,15 @@
 
 package io.gearpump.streaming.task
 
+import scala.concurrent.duration.FiniteDuration
+
 import akka.actor.Actor.Receive
 import akka.actor.{ActorRef, ActorSystem, Cancellable, Props}
+import org.slf4j.Logger
+
 import io.gearpump.cluster.UserConfig
 import io.gearpump.util.LogUtil
 import io.gearpump.{Message, TimeStamp}
-import org.slf4j.Logger
-
-import scala.concurrent.duration.FiniteDuration
 
 /**
  * This provides context information for a task.
@@ -36,7 +37,7 @@ trait TaskContext {
 
   def executorId: Int
 
-  def appId : Int
+  def appId: Int
 
   def appName: String
 
@@ -44,7 +45,7 @@ trait TaskContext {
    * The actorRef of AppMaster
    * @return application master's actor reference
    */
-  def appMaster : ActorRef
+  def appMaster: ActorRef
 
   /**
    * The task parallelism
@@ -61,64 +62,62 @@ trait TaskContext {
    */
   def parallelism: Int
 
-
   /**
    * Please don't use this if possible.
    * @return  self actor ref
    */
-  //TODO: We should remove the self from TaskContext
+  // TODO: We should remove the self from TaskContext
   def self: ActorRef
 
   /**
    * Please don't use this if possible
    * @return the actor system
    */
-  //TODO: we should remove this in future
+  // TODO: we should remove this in future
   def system: ActorSystem
 
   /**
-   * This can be used to output messages to downstream tasks.
-   * The data shuffling rule can be decided by Partitioner.
+   * This can be used to output messages to downstream tasks. The data shuffling rule
+   * can be decided by Partitioner.
+   *
    * @param msg message to output
    */
-  def output(msg : Message) : Unit
-
+  def output(msg: Message): Unit
 
   def actorOf(props: Props): ActorRef
 
   def actorOf(props: Props, name: String): ActorRef
 
-  def schedule(initialDelay: FiniteDuration, interval: FiniteDuration)(f: ⇒ Unit): Cancellable
+  def schedule(initialDelay: FiniteDuration, interval: FiniteDuration)(f: => Unit): Cancellable
 
   /**
    * akka.actor.ActorRefProvider.scheduleOnce
+   *
    * @param initialDelay  the initial delay
    * @param f  the function to execute after initial delay
    * @return the executable
    */
-  def scheduleOnce(initialDelay: FiniteDuration)(f: ⇒ Unit): Cancellable
+  def scheduleOnce(initialDelay: FiniteDuration)(f: => Unit): Cancellable
 
-   /**
+  /**
    * For managed message(type of Message), the sender only serve as a unique Id,
    * It's address is not something meaningful, you should not use this directly
    *
    * For unmanaged message, the sender represent the sender ActorRef
    * @return sender
    */
-   def sender: ActorRef
-
+  def sender: ActorRef
 
   /**
-   * retrieve upstream min clock from TaskActor
+   * Retrieves upstream min clock from TaskActor
+   *
    * @return the min clock
    */
   def upstreamMinClock: TimeStamp
 
-
   /**
-   * logger is environment dependant, it should be provided by
+   * Logger is environment dependant, it should be provided by
    * containing environment.
-   * @return
    */
   def logger: Logger
 }
@@ -130,28 +129,35 @@ trait TaskInterface {
 
   /**
    * Method called with the task is initialized.
-   * @param startTime startTime that can be used to decide from when a source producer task should replay the data source, or from when a processor task should recover its checkpoint data in to in-memory state.
+   * @param startTime startTime that can be used to decide from when a source producer task should
+   *                  replay the data source, or from when a processor task should recover its
+   *                  checkpoint data in to in-memory state.
    */
-  def onStart(startTime : StartTime) : Unit
+  def onStart(startTime: StartTime): Unit
 
-  /** Method called for each message received.
-   * @param msg message send by upstream tasks
+  /**
+   * Method called for each message received.
+   *
+   * @param msg Message send by upstream tasks
    */
-  def onNext(msg : Message) : Unit
+  def onNext(msg: Message): Unit
 
-  /** Method called when task is under clean up.
+  /**
+   * Method called when task is under clean up.
+   *
    * This can be used to cleanup resource when the application finished.
    */
-  def onStop() : Unit
+  def onStop(): Unit
 
   /**
-   * handler for unmanaged message
-  * @return the handler
-  */
+   * Handlers unmanaged messages
+   *
+   * @return the handler
+   */
   def receiveUnManagedMessage: Receive = null
 }
 
-abstract class Task(taskContext : TaskContext, userConf : UserConfig) extends TaskInterface{
+abstract class Task(taskContext: TaskContext, userConf: UserConfig) extends TaskInterface {
 
   import taskContext.{appId, executorId, taskId}
 
@@ -170,11 +176,11 @@ abstract class Task(taskContext : TaskContext, userConf : UserConfig) extends Ta
    */
   protected def sender: ActorRef = taskContext.sender
 
-  def onStart(startTime : StartTime) : Unit = {}
+  def onStart(startTime: StartTime): Unit = {}
 
-  def onNext(msg : Message) : Unit = {}
+  def onNext(msg: Message): Unit = {}
 
-  def onStop() : Unit = {}
+  def onStop(): Unit = {}
 
   override def receiveUnManagedMessage: Receive = {
     case msg =>

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/TaskActor.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskActor.scala
index d54fce5..b9b8829 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskActor.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/task/TaskActor.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -22,6 +22,8 @@ import java.util
 import java.util.concurrent.TimeUnit
 
 import akka.actor._
+import org.slf4j.Logger
+
 import io.gearpump.cluster.UserConfig
 import io.gearpump.gs.collections.impl.map.mutable.primitive.IntShortHashMap
 import io.gearpump.metrics.Metrics
@@ -31,107 +33,107 @@ import io.gearpump.streaming.ExecutorToAppMaster._
 import io.gearpump.streaming.{Constants, ProcessorId}
 import io.gearpump.util.{LogUtil, TimeOutScheduler}
 import io.gearpump.{Message, TimeStamp}
-import org.slf4j.Logger
 
 /**
  *
- * All tasks of Gearpump runs inside a Actor.
- * TaskActor is the Actor container for a task.
+ * All tasks of Gearpump runs inside a Actor. TaskActor is the Actor container for a task.
  */
 class TaskActor(
     val taskId: TaskId,
-    val taskContextData : TaskContextData,
-    userConf : UserConfig,
+    val taskContextData: TaskContextData,
+    userConf: UserConfig,
     val task: TaskWrapper,
     inputSerializerPool: SerializationFramework)
-  extends Actor with ExpressTransport  with TimeOutScheduler{
+    extends Actor with ExpressTransport with TimeOutScheduler {
   var upstreamMinClock: TimeStamp = 0L
   private var _minClock: TimeStamp = 0L
 
   def serializerPool: SerializationFramework = inputSerializerPool
 
-  import Constants._
-  import io.gearpump.streaming.task.TaskActor._
   import taskContextData._
+
+  import io.gearpump.streaming.Constants._
+  import io.gearpump.streaming.task.TaskActor._
   val config = context.system.settings.config
 
   val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = executorId, task = taskId)
 
-  //metrics
+  // Metrics
   private val metricName = s"app${appId}.processor${taskId.processorId}.task${taskId.index}"
-  private val receiveLatency = Metrics(context.system).histogram(s"$metricName:receiveLatency", sampleRate = 1)
+  private val receiveLatency = Metrics(context.system).histogram(
+    s"$metricName:receiveLatency", sampleRate = 1)
   private val processTime = Metrics(context.system).histogram(s"$metricName:processTime")
   private val sendThroughput = Metrics(context.system).meter(s"$metricName:sendThroughput")
   private val receiveThroughput = Metrics(context.system).meter(s"$metricName:receiveThroughput")
 
   private val maxPendingMessageCount = config.getInt(GEARPUMP_STREAMING_MAX_PENDING_MESSAGE_COUNT)
-  private val ackOnceEveryMessageCount =  config.getInt(GEARPUMP_STREAMING_ACK_ONCE_EVERY_MESSAGE_COUNT)
+  private val ackOnceEveryMessageCount = config.getInt(
+    GEARPUMP_STREAMING_ACK_ONCE_EVERY_MESSAGE_COUNT)
 
   private val executor = context.parent
   private var life = taskContextData.life
 
-  //latency probe
-  import context.dispatcher
-
+  // Latency probe
   import scala.concurrent.duration._
+
+  import context.dispatcher
   final val LATENCY_PROBE_INTERVAL = FiniteDuration(1, TimeUnit.SECONDS)
 
-  // clock report interval
+  // Clock report interval
   final val CLOCK_REPORT_INTERVAL = FiniteDuration(1, TimeUnit.SECONDS)
 
-  // flush interval
+  // Flush interval
   final val FLUSH_INTERVAL = FiniteDuration(100, TimeUnit.MILLISECONDS)
 
   private val queue = new util.LinkedList[AnyRef]()
 
   private var subscriptions = List.empty[(Int, Subscription)]
 
-  // securityChecker will be responsible of dropping messages from
+  // SecurityChecker will be responsible of dropping messages from
   // unknown sources
-  private val securityChecker  = new SecurityChecker(taskId, self)
+  private val securityChecker = new SecurityChecker(taskId, self)
   private[task] var sessionId = NONE_SESSION
 
-  //report to appMaster with my address
+  // Reports to appMaster with my address
   express.registerLocalActor(TaskId.toLong(taskId), self)
 
-  final def receive : Receive = null
+  final def receive: Receive = null
 
   task.setTaskActor(this)
 
-  def onStart(startTime : StartTime) : Unit = {
+  def onStart(startTime: StartTime): Unit = {
     task.onStart(startTime)
   }
 
-  def onNext(msg : Message) : Unit = task.onNext(msg)
+  def onNext(msg: Message): Unit = task.onNext(msg)
 
   def onUnManagedMessage(msg: Any): Unit = task.receiveUnManagedMessage.apply(msg)
 
-  def onStop() : Unit = task.onStop()
+  def onStop(): Unit = task.onStop()
 
   /**
    * output to a downstream by specifying a arrayIndex
-   * @param arrayIndex, this is not same as ProcessorId
-   * @param msg
+   * @param arrayIndex this is not same as ProcessorId
    */
-  def output(arrayIndex: Int, msg: Message) : Unit = {
+  def output(arrayIndex: Int, msg: Message): Unit = {
     var count = 0
-    count +=  this.subscriptions(arrayIndex)._2.sendMessage(msg)
+    count += this.subscriptions(arrayIndex)._2.sendMessage(msg)
     sendThroughput.mark(count)
   }
 
-  def output(msg : Message) : Unit = {
+  def output(msg: Message): Unit = {
     var count = 0
-    this.subscriptions.foreach{ subscription =>
+    this.subscriptions.foreach { subscription =>
       count += subscription._2.sendMessage(msg)
     }
     sendThroughput.mark(count)
   }
 
-  final override def postStop() : Unit = {
+  final override def postStop(): Unit = {
     onStop()
   }
 
-  final override def preStart() : Unit = {
+  final override def preStart(): Unit = {
     val register = RegisterTask(taskId, executorId, local)
     LOG.info(s"$register")
     executor ! register
@@ -154,7 +156,7 @@ class TaskActor(
         msg match {
           case SendAck(ack, targetTask) =>
             transport(ack, targetTask)
-          case m : Message =>
+          case m: Message =>
             count += 1
             onNext(m)
           case other =>
@@ -172,18 +174,18 @@ class TaskActor(
     }
   }
 
-  private def onStartClock: Unit = {
+  private def onStartClock(): Unit = {
     LOG.info(s"received start, clock: $upstreamMinClock, sessionId: $sessionId")
     subscriptions = subscribers.map { subscriber =>
-      (subscriber.processorId ,
+      (subscriber.processorId,
         new Subscription(appId, executorId, taskId, subscriber, sessionId, this,
           maxPendingMessageCount, ackOnceEveryMessageCount))
     }.sortBy(_._1)
 
-    subscriptions.foreach(_._2.start)
+    subscriptions.foreach(_._2.start())
 
     import scala.collection.JavaConverters._
-    stashQueue.asScala.foreach{item =>
+    stashQueue.asScala.foreach { item =>
       handleMessages(item.sender).apply(item.msg)
     }
     stashQueue.clear()
@@ -198,7 +200,7 @@ class TaskActor(
   }
 
   def waitForTaskRegistered: Receive = {
-    case start@ TaskRegistered(_, sessionId, startClock) =>
+    case start@TaskRegistered(_, sessionId, startClock) =>
       this.sessionId = sessionId
       this.upstreamMinClock = startClock
       context.become(waitForStartClock)
@@ -206,9 +208,9 @@ class TaskActor(
 
   private val stashQueue = new util.LinkedList[MessageAndSender]()
 
-  def waitForStartClock : Receive = {
+  def waitForStartClock: Receive = {
     case start: StartTask =>
-      onStartClock
+      onStartClock()
     case other: AnyRef =>
       stashQueue.add(MessageAndSender(other, sender()))
   }
@@ -221,8 +223,9 @@ class TaskActor(
         doHandleMessage()
       }
     case ackRequest: AckRequest =>
-      //enqueue to handle the ackRequest and send back ack later
-      val ackResponse = securityChecker.generateAckResponse(ackRequest, sender, ackOnceEveryMessageCount)
+      // Enqueue to handle the ackRequest and send back ack later
+      val ackResponse = securityChecker.generateAckResponse(ackRequest, sender,
+        ackOnceEveryMessageCount)
       if (null != ackResponse) {
         queue.add(SendAck(ackResponse, ackRequest.taskId))
         doHandleMessage()
@@ -231,16 +234,17 @@ class TaskActor(
       subscriptions.find(_._1 == ack.taskId.processorId).foreach(_._2.receiveAck(ack))
       doHandleMessage()
     case inputMessage: SerializedMessage =>
-      val message = Message(serializerPool.get().deserialize(inputMessage.bytes), inputMessage.timeStamp)
+      val message = Message(serializerPool.get().deserialize(inputMessage.bytes),
+        inputMessage.timeStamp)
       receiveMessage(message, sender)
     case inputMessage: Message =>
       receiveMessage(inputMessage, sender)
-    case upstream@ UpstreamMinClock(upstreamClock) =>
+    case upstream@UpstreamMinClock(upstreamClock) =>
       this.upstreamMinClock = upstreamClock
 
       val subMinClock = subscriptions.foldLeft(Long.MaxValue) { (min, sub) =>
         val subMin = sub._2.minClock
-        // a subscription is holding back the _minClock;
+        // A subscription is holding back the _minClock;
         // we send AckRequest to its tasks to push _minClock forward
         if (subMin == _minClock) {
           sub._2.sendAckRequestOnStallingTime(_minClock)
@@ -255,7 +259,7 @@ class TaskActor(
         appMaster ! update
       }
 
-      // check whether current task is dead.
+      // Checks whether current task is dead.
       if (_minClock > life.death) {
         // There will be no more message received...
         val unRegister = UnRegisterTask(taskId, executorId)
@@ -273,11 +277,11 @@ class TaskActor(
           case Some(subscription) =>
             subscription.changeLife(subscriber.lifeTime cross this.life)
           case None =>
-            val subscription = new Subscription(appId, executorId, taskId, subscriber, sessionId, this,
-              maxPendingMessageCount, ackOnceEveryMessageCount)
-            subscription.start
-            subscriptions :+= (subscriber.processorId, subscription)
-            // sort, keep the order
+            val subscription = new Subscription(appId, executorId, taskId, subscriber,
+              sessionId, this, maxPendingMessageCount, ackOnceEveryMessageCount)
+            subscription.start()
+            subscriptions :+=(subscriber.processorId, subscription)
+            // Sorting, keep the order
             subscriptions = subscriptions.sortBy(_._1)
         }
       }
@@ -293,12 +297,12 @@ class TaskActor(
   }
 
   /**
-   * @return min clock of this task
+   * Returns min clock of this task
    */
   def minClock: TimeStamp = _minClock
 
   /**
-   * @return min clock of upstream task
+   * Returns min clock of upstream task
    */
   def getUpstreamMinClock: TimeStamp = upstreamMinClock
 
@@ -309,8 +313,8 @@ class TaskActor(
         queue.add(msg)
         doHandleMessage()
       case None =>
-        //Todo: Indicate the error and avoid the LOG flood
-        //LOG.error(s"Task $taskId drop message $msg")
+      // TODO: Indicate the error and avoid the LOG flood
+      // LOG.error(s"Task $taskId drop message $msg")
     }
   }
 
@@ -320,28 +324,28 @@ class TaskActor(
 }
 
 object TaskActor {
-  val CLOCK_SYNC_TIMEOUT_INTERVAL = 3 * 1000 //3 seconds
+  // 3 seconds
+  val CLOCK_SYNC_TIMEOUT_INTERVAL = 3 * 1000
 
   // If the message comes from an unknown source, securityChecker will drop it
-  class SecurityChecker(task_id: TaskId, self : ActorRef) {
+  class SecurityChecker(task_id: TaskId, self: ActorRef) {
 
     private val LOG: Logger = LogUtil.getLogger(getClass, task = task_id)
 
-    // Use mutable HashMap for performance optimization
+    // Uses mutable HashMap for performance optimization
     private val receivedMsgCount = new IntShortHashMap()
 
-
     // Tricky performance optimization to save memory.
     // We store the session Id in the uid of ActorPath
     // ActorPath.hashCode is same as uid.
     private def getSessionId(actor: ActorRef): Int = {
-      //TODO: As method uid is protected in [akka] package. We
+      // TODO: As method uid is protected in [akka] package. We
       // are using hashCode instead of uid.
       actor.hashCode()
     }
 
     def handleInitialAckRequest(ackRequest: InitialAckRequest): Ack = {
-      LOG.debug(s"Handle InitialAckRequest for session $ackRequest" )
+      LOG.debug(s"Handle InitialAckRequest for session $ackRequest")
       val sessionId = ackRequest.sessionId
       if (sessionId == NONE_SESSION) {
         LOG.error(s"SessionId is not initialized, ackRequest: $ackRequest")
@@ -355,7 +359,7 @@ object TaskActor {
     def generateAckResponse(ackRequest: AckRequest, sender: ActorRef, incrementCount: Int): Ack = {
       val sessionId = ackRequest.sessionId
       if (receivedMsgCount.containsKey(sessionId)) {
-        // we increment more count for each AckRequest
+        // Increments more count for each AckRequest
         // to throttle the number of unacked AckRequest
         receivedMsgCount.put(sessionId, (receivedMsgCount.get(sessionId) + incrementCount).toShort)
         Ack(task_id, ackRequest.seq, receivedMsgCount.get(sessionId), ackRequest.sessionId)
@@ -366,8 +370,8 @@ object TaskActor {
     }
 
     // If the message comes from an unknown source, then drop it
-    def checkMessage(message : Message, sender: ActorRef): Option[Message] = {
-      if(sender.equals(self)){
+    def checkMessage(message: Message, sender: ActorRef): Option[Message] = {
+      if (sender.equals(self)) {
         Some(message)
       } else {
         val sessionId = getSessionId(sender)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/TaskContextData.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskContextData.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskContextData.scala
index ce32606..28605cf 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskContextData.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/task/TaskContextData.scala
@@ -19,13 +19,14 @@
 package io.gearpump.streaming.task
 
 import akka.actor.ActorRef
+
 import io.gearpump.streaming.LifeTime
 
 case class TaskContextData(
-    executorId : Int,
-    appId : Int,
+    executorId: Int,
+    appId: Int,
     appName: String,
-    appMaster : ActorRef,
+    appMaster: ActorRef,
     parallelism: Int,
     life: LifeTime,
     subscribers: List[Subscriber])

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/TaskControlMessage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskControlMessage.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskControlMessage.scala
index 59babff..d89387a 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskControlMessage.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/task/TaskControlMessage.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -32,9 +32,11 @@ case class InitialAckRequest(taskId: TaskId, sessionId: Int)
  */
 case class AckRequest(taskId: TaskId, seq: Short, sessionId: Int)
 
-/*
-  Here the seq field represents the expected number of received messages
-    and the actualReceivedNum field means the actual received number since start
+/**
+ * Ack back to sender task actor.
+ *
+ * @param seq The seq field represents the expected number of received messages and the
+ *            actualReceivedNum field means the actual received number since start.
  */
 case class Ack(taskId: TaskId, seq: Short, actualReceivedNum: Short, sessionId: Int)
 
@@ -60,6 +62,7 @@ case object GetStartClock
 
 case class StartClock(clock: TimeStamp)
 
+/** Probe the latency between two upstream to downstream tasks. */
 case class LatencyProbe(timestamp: Long)
 
 case class SendMessageLoss()

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/TaskId.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskId.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskId.scala
index f599540..66c3c52 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskId.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/task/TaskId.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,9 +20,9 @@ package io.gearpump.streaming.task
 
 import io.gearpump.streaming._
 
-case class TaskId(processorId : ProcessorId, index : TaskIndex)
+case class TaskId(processorId: ProcessorId, index: TaskIndex)
 
 object TaskId {
-  def toLong(id : TaskId) = (id.processorId.toLong << 32) + id.index
-  def fromLong(id : Long) = TaskId(((id >> 32) & 0xFFFFFFFF).toInt, (id & 0xFFFFFFFF).toInt)
+  def toLong(id: TaskId): Long = (id.processorId.toLong << 32) + id.index
+  def fromLong(id: Long): TaskId = TaskId(((id >> 32) & 0xFFFFFFFF).toInt, (id & 0xFFFFFFFF).toInt)
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/TaskMessageSerializer.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskMessageSerializer.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskMessageSerializer.scala
index 370eef7..500f8b3 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskMessageSerializer.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/task/TaskMessageSerializer.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,8 +19,6 @@ package io.gearpump.streaming.task
 
 import java.io.{DataInput, DataOutput}
 
-import org.jboss.netty.buffer.ChannelBuffer
-
 trait TaskMessageSerializer[T] {
   def write(dataOutput: DataOutput, obj: T)
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/TaskUtil.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskUtil.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskUtil.scala
index c6564ff..040fc2e 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskUtil.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/task/TaskUtil.scala
@@ -21,11 +21,12 @@ package io.gearpump.streaming.task
 object TaskUtil {
 
   /**
-   * Resolve a classname to a Task class.
+   * Resolves a classname to a Task class.
+   *
    * @param className  the class name to resolve
    * @return resolved class
    */
-  def loadClass(className: String): Class[_<:Task] = {
+  def loadClass(className: String): Class[_ <: Task] = {
     val loader = Thread.currentThread().getContextClassLoader()
     loader.loadClass(className).asSubclass(classOf[Task])
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/TaskWrapper.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskWrapper.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskWrapper.scala
index 3be396f..e7e883c 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskWrapper.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/task/TaskWrapper.scala
@@ -18,22 +18,26 @@
 
 package io.gearpump.streaming.task
 
+import scala.concurrent.duration.FiniteDuration
+
 import akka.actor.Actor._
 import akka.actor.{ActorRef, ActorSystem, Cancellable, Props}
-import io.gearpump.{TimeStamp, Message}
-import io.gearpump.cluster.UserConfig
-import io.gearpump.util.LogUtil
 import org.slf4j.Logger
 
-import scala.concurrent.duration.FiniteDuration
+import io.gearpump.cluster.UserConfig
+import io.gearpump.util.LogUtil
+import io.gearpump.{Message, TimeStamp}
 
 /**
  * This provides TaskContext for user defined tasks
+ *
  * @param taskClass task class
  * @param context context class
  * @param userConf user config
  */
-class TaskWrapper(val taskId: TaskId, taskClass: Class[_ <: Task], context: TaskContextData, userConf: UserConfig) extends TaskContext with TaskInterface {
+class TaskWrapper(
+    val taskId: TaskId, taskClass: Class[_ <: Task], context: TaskContextData,
+    userConf: UserConfig) extends TaskContext with TaskInterface {
 
   private val LOG = LogUtil.getLogger(taskClass, task = taskId)
 
@@ -56,18 +60,19 @@ class TaskWrapper(val taskId: TaskId, taskClass: Class[_ <: Task], context: Task
   override def output(msg: Message): Unit = actor.output(msg)
 
   /**
-   * @see [[TaskActor.output]]
+   * See [[io.gearpump.streaming.task.TaskActor]] output(arrayIndex: Int, msg: Message): Unit
+   *
    * @param index, not same as ProcessorId
-   * @param msg
    */
   def output(index: Int, msg: Message): Unit = actor.output(index, msg)
 
   /**
    * Use with caution, output unmanaged message to target tasks
+   *
    * @param msg  message to output
    * @param tasks  the tasks to output to
    */
-  def outputUnManaged(msg: AnyRef, tasks: TaskId *): Unit = {
+  def outputUnManaged(msg: AnyRef, tasks: TaskId*): Unit = {
     actor.transport(msg, tasks: _*)
   }
 
@@ -105,12 +110,12 @@ class TaskWrapper(val taskId: TaskId, taskClass: Class[_ <: Task], context: Task
     actor.getUpstreamMinClock
   }
 
-  def schedule(initialDelay: FiniteDuration, interval: FiniteDuration)(f: ⇒ Unit): Cancellable = {
+  def schedule(initialDelay: FiniteDuration, interval: FiniteDuration)(f: => Unit): Cancellable = {
     val dispatcher = actor.context.system.dispatcher
     actor.context.system.scheduler.schedule(initialDelay, interval)(f)(dispatcher)
   }
 
-  def scheduleOnce(initialDelay: FiniteDuration)(f: ⇒ Unit): Cancellable = {
+  def scheduleOnce(initialDelay: FiniteDuration)(f: => Unit): Cancellable = {
     val dispatcher = actor.context.system.dispatcher
     actor.context.system.scheduler.scheduleOnce(initialDelay)(f)(dispatcher)
   }
@@ -121,9 +126,8 @@ class TaskWrapper(val taskId: TaskId, taskClass: Class[_ <: Task], context: Task
   }
 
   /**
-   * logger is environment dependant, it should be provided by
+   * Logger is environment dependant, it should be provided by
    * containing environment.
-   * @return
    */
   override def logger: Logger = LOG
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/transaction/api/CheckpointStore.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/CheckpointStore.scala b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/CheckpointStore.scala
index 7dd08fb..f3894ea 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/CheckpointStore.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/CheckpointStore.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,9 +18,9 @@
 
 package io.gearpump.streaming.transaction.api
 
-import io.gearpump.streaming.task.TaskContext
 import io.gearpump.TimeStamp
 import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.task.TaskContext
 
 /**
  * CheckpointStore persistently stores mapping of timestamp to checkpoint