You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/04/26 11:42:52 UTC

[42/49] incubator-gearpump git commit: GEARPUMP-11, fix code style

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/docs/deployment-configuration.md
----------------------------------------------------------------------
diff --git a/docs/deployment-configuration.md b/docs/deployment-configuration.md
index d0ef37d..70b5500 100644
--- a/docs/deployment-configuration.md
+++ b/docs/deployment-configuration.md
@@ -29,7 +29,7 @@ gearpump{
 
 ## Configuration for user submitted application job
 
-For user application job, it will read configuration file `gear.conf` and `application.conf` from classpath, while `application.conf` has higher prioprty. 
+For user application job, it will read configuration file `gear.conf` and `application.conf` from classpath, while `application.conf` has higher priority. 
 The default classpath contains:
 
 1. `conf/`
@@ -37,7 +37,6 @@ The default classpath contains:
 
 For example, you can put a `application.conf` on your working directory, and then it will be effective when you submit a new job application.
 
-
 ## Logging
 
 To change the log level, you need to change both `gear.conf`, and `log4j.properties`. 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/docs/deployment-msg-delivery.md
----------------------------------------------------------------------
diff --git a/docs/deployment-msg-delivery.md b/docs/deployment-msg-delivery.md
index ff6de1f..10208ba 100644
--- a/docs/deployment-msg-delivery.md
+++ b/docs/deployment-msg-delivery.md
@@ -3,10 +3,9 @@ layout: global
 title: Deployment for Reliable Message Delivery
 ---
 
-
 ## How to deploy for At Least Once Message Delivery?
 
-As introduced in the [What is At Least Once Message Delievery](message-delivery.html#what-is-at-least-once-message-delivery), Gearpump has a built in KafkaSource. To get at least once message delivery, users should deploy a Kafka cluster as the offset store along with the Gearpump cluster. 
+As introduced in the [What is At Least Once Message Delivery](message-delivery.html#what-is-at-least-once-message-delivery), Gearpump has a built in KafkaSource. To get at least once message delivery, users should deploy a Kafka cluster as the offset store along with the Gearpump cluster. 
 
 Here's an example to deploy a local Kafka cluster. 
 
@@ -31,7 +30,6 @@ Here's an example to deploy a local Kafka cluster.
    val source = new KafkaSource("topic1", "localhost:2181", offsetStorageFactory)
    ```
 
-
 ## How to deploy for Exactly Once Message Delivery?
 
 Exactly Once Message Delivery requires both an offset store and a checkpoint store. For the offset store, a Kafka cluster should be deployed as in the previous section. As for the checkpoint store, Gearpump has built-in support for Hadoop file systems, like HDFS. Hence, users should deploy a HDFS cluster alongside the Gearpump cluster. 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/docs/deployment-resource-isolation.md
----------------------------------------------------------------------
diff --git a/docs/deployment-resource-isolation.md b/docs/deployment-resource-isolation.md
index 780dd60..0cb3764 100644
--- a/docs/deployment-resource-isolation.md
+++ b/docs/deployment-resource-isolation.md
@@ -9,7 +9,7 @@ cgroups (abbreviated from control groups) is a Linux kernel feature to limit, ac
 
 ## Start CGroup Service 
 
-Cgroups feature is only supported by Linux whose kernel version is larger than 2.6.18. Please also make sure the SELinux is disabled before start CGroup.
+CGroup feature is only supported by Linux whose kernel version is larger than 2.6.18. Please also make sure the SELinux is disabled before start CGroup.
 
 The following steps are supposed to be executed by root user.
 
@@ -30,7 +30,7 @@ The following steps are supposed to be executed by root user.
    perf_event /sys/fs/cgroup/perf_event
    ```
    
-3. If you want to assign permission to user **gear** to launch Gearpump Worker and applications with resouce isolation enabled, you need to check gear's uid and gid in /etc/passwd file, let's take **500** for example.
+3. If you want to assign permission to user **gear** to launch Gearpump Worker and applications with resource isolation enabled, you need to check gear's uid and gid in /etc/passwd file, let's take **500** for example.
 
 4. Add following content to /etc/cgconfig.conf
     
@@ -71,7 +71,7 @@ The following steps are supposed to be executed by root user.
    
 5. There should be a folder **gearpump** generated under the mount point of cpu subsystem and its owner is **gear:gear**.  
   
-6. Repeat the above-mentioned steps on each machine where you want to lauch Gearpump.   
+6. Repeat the above-mentioned steps on each machine where you want to launch Gearpump.   
 
 ## Enable Cgroups in Gearpump 
 1. Login into the machine which has CGroup prepared with user **gear**.
@@ -90,11 +90,11 @@ The following steps are supposed to be executed by root user.
 
    Please note the gearpump.cgroup.root **gearpump** must be consistent with the group name in /etc/cgconfig.conf.
 
-3. Repeat the above-mentioned steps on each machine where you want to lauch Gearpump
+3. Repeat the above-mentioned steps on each machine where you want to launch Gearpump
 
 4. Start the Gearpump cluster, please refer to [Deploy Gearpump in Standalone Mode](deployment-standalone.html)
 
-## Launch Application From Commad Line
+## Launch Application From Command Line
 1. Login into the machine which has Gearpump distribution.
 
 2. Enter into Gearpump's home folder, edit gear.conf under folder ```${GEARPUMP_HOME}/conf/```
@@ -117,4 +117,4 @@ The following steps are supposed to be executed by root user.
 If you want to submit the application from dashboard, by default the ```gearpump.cgroup.cpu-core-limit-per-executor``` is inherited from Worker's configuration. You can provide your own conf file to override it.
 
 ## Limitations
-Windows and Mac OS X don't support CGroup, so the resource isolation will not work even if you turn it on. There will not be any limitition for single executor's cpu usage.
\ No newline at end of file
+Windows and Mac OS X don't support CGroup, so the resource isolation will not work even if you turn it on. There will not be any limitation for single executor's cpu usage.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/docs/deployment-security.md
----------------------------------------------------------------------
diff --git a/docs/deployment-security.md b/docs/deployment-security.md
index 4be1346..9ce3715 100644
--- a/docs/deployment-security.md
+++ b/docs/deployment-security.md
@@ -36,7 +36,7 @@ Suppose user `gear` will launch gearpump on YARN, then the corresponding princip
   Before calling "yarnclient launch", make sure you have put all yarn configuration files under classpath.
   Typically, you can just copy all files under `$HADOOP_HOME/etc/hadoop` from one of the YARN cluster machine to `conf/yarnconf` of gearpump.
   `$HADOOP_HOME` points to the Hadoop installation directory. 
-5. Get Kerberos credentials to sumbit the job:
+5. Get Kerberos credentials to submit the job:
 
    ```
    kinit gearpump/fully.qualified.domain.name@YOUR-REALM.COM
@@ -79,7 +79,7 @@ Since Gearpump’s Master-Worker structure is similar to HDFS’s NameNode-DataN
 
 1. User creates kerberos principal and keytab for Gearpump.
 2. Deploy the keytab files to all the cluster nodes.
-3. Configure Gearpump’s conf file, specify kerberos principal and local keytab file localtion.
+3. Configure Gearpump’s conf file, specify kerberos principal and local keytab file location.
 4. Start Master and Worker.
 
 Every application has a submitter/user. We will separate the application from different users, like different log folders for different applications. 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/docs/deployment-standalone.md
----------------------------------------------------------------------
diff --git a/docs/deployment-standalone.md b/docs/deployment-standalone.md
index 06e701c..d77ffc8 100644
--- a/docs/deployment-standalone.md
+++ b/docs/deployment-standalone.md
@@ -9,7 +9,6 @@ Standalone mode is a distributed cluster mode. That is, Gearpump runs as service
 
 To deploy Gearpump in cluster mode, please first check that the [Pre-requisites](hardware-requirement.html) are met.
 
-
 ### How to Install
 You need to have Gearpump binary at hand. Please refer to [How to get gearpump distribution](get-gearpump-distribution.html) to get the Gearpump binary.
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/docs/deployment-ui-authentication.md
----------------------------------------------------------------------
diff --git a/docs/deployment-ui-authentication.md b/docs/deployment-ui-authentication.md
index 04a9c7b..872a857 100644
--- a/docs/deployment-ui-authentication.md
+++ b/docs/deployment-ui-authentication.md
@@ -5,7 +5,6 @@ title: UI Dashboard Authentication and Authorization
 
 ## What is this about?
 
-
 ## How to enable UI authentication?
 
 1. Change config file gear.conf, find entry `gearpump-ui.gearpump.ui-security.authentication-enabled`, change the value to true
@@ -48,7 +47,6 @@ is the steps to configure ConfigFileBasedAuthenticator.
 
 For the default authentication plugin, it has three categories of users: admins, users, and guests.
 
-
 * admins: have unlimited permission, like shutdown a cluster, add/remove machines.
 * users: have limited permission to submit an application and etc..
 * guests: can not submit/kill applications, but can view the application status.
@@ -245,7 +243,7 @@ To use Google OAuth2 Authenticator, there are several steps:
 3. Config gear.conf `gearpump.ui-security.oauth2-authenticators.cloudfoundryuaa` section.
 Please make sure class name, client ID, client Secret, and callback URL are set properly.
 
-**NOTE:** The callback URL here should matche what you set on CloudFoundry UAA in step1.
+**NOTE:** The callback URL here should match what you set on CloudFoundry UAA in step1.
 
 #### Step3: Configure network proxy for Gearpump UI server if applies
 
@@ -286,7 +284,7 @@ You can follow the Google OAuth2 example code to define a custom OAuth2Authentic
     * parameters and form fields.
     *
     * @note '''Thread-safety''' is a MUST requirement. Developer need to ensure the sub-class is thread-safe.
-    * Sub-class should have a parameterless constructor.
+    * Sub-class should have a parameter-less constructor.
     *
     * @note OAuth2 Authenticator requires access of Internet. Please make sure HTTP proxy are
     * set properly if applied.
@@ -374,4 +372,3 @@ You can follow the Google OAuth2 example code to define a custom OAuth2Authentic
    ```
    The configuration entry is supposed to be used by class `SocialNetworkXAuthenticator`.
 
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/docs/dev-connectors.md
----------------------------------------------------------------------
diff --git a/docs/dev-connectors.md b/docs/dev-connectors.md
index a9983fc..08ff023 100644
--- a/docs/dev-connectors.md
+++ b/docs/dev-connectors.md
@@ -9,7 +9,7 @@ title: Gearpump Connectors
 ### DataSource
 `DataSource` is the concept in Gearpump that without input and will output messages. So, basically, `DataSource` is the start point of a streaming processing flow.
 
-As Gearpump depends on `DataSource` to be replayable to ensure at-least-once message delivery and exactly-once message delivery, for some data sources, we will need a `io.gearpump.streaming.transaction.api.OffsetStorageFactory` to store the offset (progress) of current `DataSource`. So that, when a replay is needed, Gearpump can guide `DataSource` to replay from certain offset.
+As Gearpump depends on `DataSource` to be replay-able to ensure at-least-once message delivery and exactly-once message delivery, for some data sources, we will need a `io.gearpump.streaming.transaction.api.OffsetStorageFactory` to store the offset (progress) of current `DataSource`. So that, when a replay is needed, Gearpump can guide `DataSource` to replay from certain offset.
 
 Currently Gearpump `DataSource` only support infinite stream. Finite stream support will be added in a near future release.
 
@@ -34,7 +34,6 @@ Name | Description
 `HBaseSink` | Write the message to HBase. The message to write must be HBase `Put` or a tuple of `(rowKey, family, column, value)`.
 `KafkaSink` | Write to Kafka.
 
-
 ## Use of Connectors
 
 ### Use of `KafkaSource`
@@ -88,7 +87,6 @@ Then, you can use `KafkaSource` in your application:
 
 To use `HBaseSink` in your application, you first need to add the `gearpump-external-hbase` library dependency in your application:
 
-
 ```
 "com.github.intel-hadoop" %% "gearpump-external-hbase" % {{ site.GEARPUMP_VERSION }}
 ```
@@ -101,7 +99,6 @@ To use `HBaseSink` in your application, you first need to add the `gearpump-exte
 </dependency>
 ```
 
-
 To connect to HBase, you need to provide following info:
  - the HBase configuration to tell which HBase service to connect
  - the table name (you must create the table yourself, see the [HBase documentation](https://hbase.apache.org/book.html))

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/docs/dev-custom-serializer.md
----------------------------------------------------------------------
diff --git a/docs/dev-custom-serializer.md b/docs/dev-custom-serializer.md
index ca0a332..5be0544 100644
--- a/docs/dev-custom-serializer.md
+++ b/docs/dev-custom-serializer.md
@@ -41,7 +41,6 @@ but rather
    import io.gearpump.google.common.io.Files
 ```
 
-
 ##### System Level Serializer
 
 If the serializer is widely used, you can define a global serializer which is available to all applications(or worker or master) in the system.
@@ -128,7 +127,6 @@ gearpump {
 
 ###### Step3: All set!
 
-
 #### Advanced: Choose another serialization framework
 
 Note: This is only for advanced user which require deep customization of Gearpump platform.

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/docs/dev-non-streaming-example.md
----------------------------------------------------------------------
diff --git a/docs/dev-non-streaming-example.md b/docs/dev-non-streaming-example.md
index eac92d0..1aec2d1 100644
--- a/docs/dev-non-streaming-example.md
+++ b/docs/dev-non-streaming-example.md
@@ -11,7 +11,6 @@ What Distributed Shell do is that user send a shell command to the cluster and t
 
 Repository and library dependencies can be found at [Maven Setting](maven-setting.html)
 
-
 ### Define Executor Class
 
 ```scala

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/docs/dev-rest-api.md
----------------------------------------------------------------------
diff --git a/docs/dev-rest-api.md b/docs/dev-rest-api.md
index 7b2bb6f..6243a89 100644
--- a/docs/dev-rest-api.md
+++ b/docs/dev-rest-api.md
@@ -579,8 +579,6 @@ Sample Response:
 {"enabled":true}
 ```
 
-
-
 ### GET api/v1.0/supervisor
 Get the supervisor path
 
@@ -616,7 +614,6 @@ Sample Response:
 ### POST api/v1.0/supervisor/removeworker/&lt;worker-id&gt;
 Remove single worker instance by specifying a worker Id.
 
-
 **NOTE:* Use with caution!
 
 **NOTE:** All executors JVMs under this worker JVM will also be destroyed. It will trigger failover for all
@@ -637,7 +634,6 @@ Sample Response:
 
 ## Application service
 
-
 ### GET api/v1.0/appmaster/&lt;appId&gt;?detail=&lt;true|false&gt;
 Query information of an specific application of Id appId
 
@@ -793,7 +789,6 @@ Sample Response:
 }
 ```
 
-
 ### GET api/v1.0/appmaster/&lt;appId&gt;/config
 Query the configuration of specific application appId
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/docs/dev-storm.md
----------------------------------------------------------------------
diff --git a/docs/dev-storm.md b/docs/dev-storm.md
index 914aabe..129f38e 100644
--- a/docs/dev-storm.md
+++ b/docs/dev-storm.md
@@ -4,7 +4,7 @@ title: Storm Compatibility
 ---
 
 Gearpump provides **binary compatibility** for Apache Storm applications. That is to say, users could easily grab an existing Storm jar and run it 
-on Gearpump. This documentation illustrates Gearpump's comapatibility with Storm.  
+on Gearpump. This documentation illustrates Gearpump's compatibility with Storm.  
 
 ## What Storm features are supported on Gearpump 
 
@@ -32,7 +32,7 @@ on Gearpump. This documentation illustrates Gearpump's comapatibility with Storm
 | storm-jdbc | yes |
 | storm-redis | yes |
 | flux | yes |
-| storm-eventhubs | not verfied |
+| storm-eventhubs | not verified |
 | Trident | no |
 
 ### At Least Once support
@@ -95,7 +95,6 @@ This section shows how to run an existing Storm jar in a local Gearpump cluster.
    
    a. submit Storm applications through command line
 
-
      ```
      bin/storm app -verbose -config app.yaml -jar storm-starter-${STORM_VERSION}.jar storm.starter.ExclamationTopology exclamation 
      ```
@@ -138,14 +137,14 @@ Here's an example of `WordCountTopology` with acker bolts (ackers) being transla
 
 Gearpump creates a `StormProducer` for each Storm spout and a `StormProcessor` for each Storm bolt (except for ackers) with the same parallelism, and wires them together using the same grouping strategy (partitioning in Gearpump) as in Storm. 
 
-At runtime, spouts and bolts are running inside `StormProducer` tasks and `StormProcessor` tasks respectively. Messages emitted by spout are passed to `StormProducer`, transferred to `StormProcessor` and passed down to bolt.  Messages are serialized / deserialized with Storm serializers.
+At runtime, spouts and bolts are running inside `StormProducer` tasks and `StormProcessor` tasks respectively. Messages emitted by spout are passed to `StormProducer`, transferred to `StormProcessor` and passed down to bolt.  Messages are serialized / de-serialized with Storm serializers.
 
 Storm ackers are dropped since Gearpump has a different mechanism of message tracking and flow control. 
 
 ### Task execution
 
 Each Storm task is executed by a dedicated thread while all Gearpump tasks of an executor share a thread pool. Generally, we can achieve better performance with a shared thread pool. It's possible, however, some tasks block and take up all the threads. In that case, we can 
-fall back to the Storm way by setting `gearpump.task-dispatcher` to `"gaerpump.single-thread-dispatcher"` in `gear.conf`.
+fall back to the Storm way by setting `gearpump.task-dispatcher` to `"gearpump.single-thread-dispatcher"` in `gear.conf`.
 
 ### Message tracking 
 
@@ -153,7 +152,6 @@ Storm tracks the lineage of each message with ackers to guarantee at-least-once
 
 Gearpump [tracks messages between a sender and receiver in an efficient way](gearpump-internals.html#how-do-we-detect-message-loss). Message loss causes the whole application to replay from the [minimum timestamp of all pending messages in the system](gearpump-internals.html#application-clock-and-global-clock-service). 
 
-
 ### Flow control
 
 Storm throttles flow rate at spout, which stops sending messages if the number of unacked messages exceeds `topology.max.spout.pending`. 
@@ -185,7 +183,6 @@ Since StreamCQL already supports Storm, it's straightforward to run StreamCQL ov
 
 3. Go to the installed stream-cql-binary, and change following settings in `conf/streaming-site.xml` with the output Nimbus configs in Step 2.
 
-
    ```xml
     <property>
       <name>streaming.storm.nimbus.host</name>

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/docs/dev-write-1st-app.md
----------------------------------------------------------------------
diff --git a/docs/dev-write-1st-app.md b/docs/dev-write-1st-app.md
index 738790b..1dd47b6 100644
--- a/docs/dev-write-1st-app.md
+++ b/docs/dev-write-1st-app.md
@@ -55,8 +55,6 @@ object Split {
 }
 ```
 
-
-
 Like Split, every processor extends a `TaskActor`.  The `onStart` method is called once before any message comes in; `onNext` method is called to process every incoming message. Note that Gearpump employs the message-driven model and that's why Split sends itself a message at the end of `onStart` and `onNext` to trigger next message processing.
 
 #### Sum Processor
@@ -157,13 +155,12 @@ object WordCount extends App with ArgumentsParser {
 
 We override `options` value and define an array of command line arguments to parse. We want application users to pass in masters' hosts and ports, the parallelism of split and sum tasks, and how long to run the example. We also specify whether an option is `required` and provide `defaultValue` for some arguments.
 
-Given the `ParseResult` of command line arguments, we create `TaskDescription`s for Split and Sum processors, and connect them with `HashPartitioner` using DAG API. The graph is wrapped in an `AppDescrition` , which is finally submit to master.
+Given the `ParseResult` of command line arguments, we create `TaskDescription`s for Split and Sum processors, and connect them with `HashPartitioner` using DAG API. The graph is wrapped in an `AppDescription` , which is finally submit to master.
 
 ### Submit application
 
 After all these, you need to package everything into a uber jar and submit the jar to Gearpump Cluster. Please check [Application submission tool](commandline.html) to command line tool syntax.
 
-
 ### Advanced topic
 For a real application, you definitely need to define your own customized message passing between processors.
 Customized message needs customized serializer to help message passing over wire.

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/docs/features.md
----------------------------------------------------------------------
diff --git a/docs/features.md b/docs/features.md
index 2bd9417..210ac7a 100644
--- a/docs/features.md
+++ b/docs/features.md
@@ -7,8 +7,7 @@ description: Gearpump Technical Highlights
 
 ### Technical highlights of Gearpump
 
-
-Gearpump is a performant, flexible, fault-tolerant, and responsive streaming platform with a lot of nice features, its technical highlights include:
+Gearpump is a high performance, flexible, fault-tolerant, and responsive streaming platform with a lot of nice features, its technical highlights include:
 
 #### Actors everywhere
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/docs/gearpump-internals.md
----------------------------------------------------------------------
diff --git a/docs/gearpump-internals.md b/docs/gearpump-internals.md
index 9221157..06e045d 100644
--- a/docs/gearpump-internals.md
+++ b/docs/gearpump-internals.md
@@ -4,7 +4,7 @@ displayTitle: Gearpump Internals
 title: Gearpump Internals
 description: Gearpump Internals
 ---
-### Actor Hiearachy?
+### Actor Hierarchy?
 
 ![Actor Hierarchy](img/actor_hierarchy.png)
 
@@ -72,7 +72,7 @@ Without flow control, one task can easily flood another task with too many messa
 Figure: Flow control, each task is "star" connected to input tasks and output tasks
 
 The difficult part for our problem is that each task can have multiple input tasks and output tasks. The input and output must be geared together so that the back pressure can be properly propagated from downstream to upstream. The flow control also needs to consider failures, and it needs to be able to recover when there is message loss.
-Another challenge is that the overhead of flow control messages can be big. If we ack every message, there will be huge amount of ack'd messages in the system, degrading streaming performance. The approach we adopted is to use explicit AckRequest message. The target tasks will only ack back when they receive the AckRequest message, and the source will only send AckRequest when it feels necessary. With this approach, we can largely reduce the overhead.
+Another challenge is that the overhead of flow control messages can be big. If we ack every message, there will be huge amount of acked messages in the system, degrading streaming performance. The approach we adopted is to use explicit AckRequest message. The target tasks will only ack back when they receive the AckRequest message, and the source will only send AckRequest when it feels necessary. With this approach, we can largely reduce the overhead.
 
 ### How do we detect message loss?
 
@@ -231,4 +231,4 @@ When there is message loss, the AppMaster will first pause the global clock serv
 
 Kafka queue only expose the offset information for each partition. What KafkaSource do is to maintain its own mapping from Kafka offset to  Application timestamp, so that we can map from a application timestamp to a Kafka offset, and replay Kafka messages from that Kafka offset.
 
-The mapping between Application timestmap with Kafka offset is stored in a distributed file system or as a Kafka topic.
+The mapping between Application timestamp with Kafka offset is stored in a distributed file system or as a Kafka topic.

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/docs/hardware-requirement.md
----------------------------------------------------------------------
diff --git a/docs/hardware-requirement.md b/docs/hardware-requirement.md
index 87e87c5..42e839c 100644
--- a/docs/hardware-requirement.md
+++ b/docs/hardware-requirement.md
@@ -25,7 +25,6 @@ CPU	| Nothing special
 HDFS installation	| Default is not required. You only need to install it when you want to store the application jars in HDFS.
 Kafka installation |	Default is not required. You need to install Kafka when you want the at-least once message delivery feature. Currently, the only supported data source for this feature is Kafka
 
-
 **  Table: The default port used in Gearpump:**
 
 | usage	| Port |	Description |

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/docs/how-to-contribute.md
----------------------------------------------------------------------
diff --git a/docs/how-to-contribute.md b/docs/how-to-contribute.md
index 4c15aaf..e9e33c1 100644
--- a/docs/how-to-contribute.md
+++ b/docs/how-to-contribute.md
@@ -3,7 +3,6 @@ layout: global
 title: How to contribute
 ---
 
-
 ## Contributions Welcome!
 Gearpump is developed by an open and friendly community. Everybody is cordially welcome to join the community and contribute to Gearpump. There are several ways to interact with the community and to contribute to Gearpump including asking questions, filing bug reports, implementing new use cases, proposing new features, joining discussions on the mailing lists, contributing code or documentation, improving the website, or testing release candidates.
 
@@ -54,7 +53,6 @@ We welcome any contribution to improve our website.
 
 Please open an issue at [Gearpump Website Issue Tracker](https://github.com/gearpump/gearpump.github.io/issues) if you think our website could be improved.
 
-
 ### More ways to contribute…
 
 There are many more ways to contribute to the Gearpump community. For example you can

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/docs/index.md
----------------------------------------------------------------------
diff --git a/docs/index.md b/docs/index.md
index 75572c5..d24f3e4 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -27,12 +27,11 @@ Gearpump's feature set includes:
 * Samoa compatibility
 * Both high level and low level API
 
-
 ### Gearpump Performance
 Per initial benchmarks we are able to process 18 million messages/second (100 bytes per message) with a 8ms latency on a 4-node cluster.
 
 ![Dashboard](img/dashboard.png)
 
 ### Gearpump and Akka
-Gearump is a 100% Akka based platform. We model big data streaming within the Akka actor hierarchy.
+Gearpump is a 100% Akka based platform. We model big data streaming within the Akka actor hierarchy.
 ![Actor Hierarchy](img/actor_hierarchy.png)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/docs/js/api-docs.js
----------------------------------------------------------------------
diff --git a/docs/js/api-docs.js b/docs/js/api-docs.js
index ce89d89..864502c 100644
--- a/docs/js/api-docs.js
+++ b/docs/js/api-docs.js
@@ -17,7 +17,7 @@
 
 /* Dynamically injected post-processing code for the API docs */
 
-$(document).ready(function() {
+$(document).ready(function () {
   var annotations = $("dt:contains('Annotations')").next("dd").children("span.name");
   addBadges(annotations, "AlphaComponent", ":: AlphaComponent ::", '<span class="alphaComponent badge">Alpha Component</span>');
   addBadges(annotations, "DeveloperApi", ":: DeveloperApi ::", '<span class="developer badge">Developer API</span>');
@@ -29,7 +29,7 @@ function addBadges(allAnnotations, name, tag, html) {
   var tags = $(".cmt:contains(" + tag + ")")
 
   // Remove identifier tags from comments
-  tags.each(function(index) {
+  tags.each(function (index) {
     var oldHTML = $(this).html();
     var newHTML = oldHTML.replace(tag, "");
     $(this).html(newHTML);

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/docs/js/api-javadocs.js
----------------------------------------------------------------------
diff --git a/docs/js/api-javadocs.js b/docs/js/api-javadocs.js
index ead13d6..c4eb8a6 100644
--- a/docs/js/api-javadocs.js
+++ b/docs/js/api-javadocs.js
@@ -17,7 +17,7 @@
 
 /* Dynamically injected post-processing code for the API docs */
 
-$(document).ready(function() {
+$(document).ready(function () {
   addBadges(":: AlphaComponent ::", '<span class="alphaComponent badge">Alpha Component</span>');
   addBadges(":: DeveloperApi ::", '<span class="developer badge">Developer API</span>');
   addBadges(":: Experimental ::", '<span class="experimental badge">Experimental</span>');
@@ -27,24 +27,24 @@ function addBadges(tag, html) {
   var tags = $(".block:contains(" + tag + ")")
 
   // Remove identifier tags
-  tags.each(function(index) {
+  tags.each(function (index) {
     var oldHTML = $(this).html();
     var newHTML = oldHTML.replace(tag, "");
     $(this).html(newHTML);
   });
 
   // Add html badge tags
-  tags.each(function(index) {
+  tags.each(function (index) {
     if ($(this).parent().is('td.colLast')) {
       $(this).parent().prepend(html);
     } else if ($(this).parent('li.blockList')
-                      .parent('ul.blockList')
-                      .parent('div.description')
-                      .parent().is('div.contentContainer')) {
+        .parent('ul.blockList')
+        .parent('div.description')
+        .parent().is('div.contentContainer')) {
       var contentContainer = $(this).parent('li.blockList')
-                                    .parent('ul.blockList')
-                                    .parent('div.description')
-                                    .parent('div.contentContainer')
+        .parent('ul.blockList')
+        .parent('div.description')
+        .parent('div.contentContainer')
       var header = contentContainer.prev('div.header');
       if (header.length > 0) {
         header.prepend(html);

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/docs/js/main.js
----------------------------------------------------------------------
diff --git a/docs/js/main.js b/docs/js/main.js
index 0bac247..ae989af 100644
--- a/docs/js/main.js
+++ b/docs/js/main.js
@@ -25,7 +25,7 @@ function codeTabs() {
     "python": "img/python-sm.png",
     "java": "img/java-sm.png"
   };
-  $("div.codetabs").each(function() {
+  $("div.codetabs").each(function () {
     $(this).addClass("tab-content");
 
     // Insert the tab bar
@@ -34,7 +34,7 @@ function codeTabs() {
 
     // Add each code sample to the tab bar:
     var codeSamples = $(this).children("div");
-    codeSamples.each(function() {
+    codeSamples.each(function () {
       $(this).addClass("tab-pane");
       var lang = $(this).data("lang");
       var image = $(this).data("image");
@@ -44,7 +44,7 @@ function codeTabs() {
       var id = "tab_" + lang + "_" + counter;
       $(this).attr("id", id);
       if (image != null && langImages[lang]) {
-        var buttonLabel = "<img src='" +langImages[lang] + "' alt='" + capitalizedLang + "' />";
+        var buttonLabel = "<img src='" + langImages[lang] + "' alt='" + capitalizedLang + "' />";
       } else if (notabs == null) {
         var buttonLabel = "<b>" + capitalizedLang + "</b>";
       } else {
@@ -67,12 +67,11 @@ function codeTabs() {
     $(this).tab('show');
     $(document).scrollTop($(this).offset().top - scrollOffset);
   });
-  $("table").each(function() {
+  $("table").each(function () {
     $(this).addClass("table table-bordered");
   });
 }
 
-
 // A script to fix internal hash links because we have an overlapping top bar.
 // Based on https://github.com/twitter/bootstrap/issues/193#issuecomment-2281510
 function maybeScrollToHash() {
@@ -82,7 +81,7 @@ function maybeScrollToHash() {
   }
 }
 
-$(function() {
+$(function () {
   codeTabs();
   // Display anchor links when hovering over headers. For documentation of the
   // configuration options, see the AnchorJS documentation.
@@ -91,11 +90,15 @@ $(function() {
   };
   anchors.add();
 
-  $(window).bind('hashchange', function() {
+  $(window).bind('hashchange', function () {
     maybeScrollToHash();
   });
 
   // Scroll now too in case we had opened the page on a hash, but wait a bit because some browsers
   // will try to do *their* initial scroll after running the onReady handler.
-  $(window).load(function() { setTimeout(function() { maybeScrollToHash(); }, 25); }); 
+  $(window).load(function () {
+    setTimeout(function () {
+      maybeScrollToHash();
+    }, 25);
+  });
 });

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/docs/message-delivery.md
----------------------------------------------------------------------
diff --git a/docs/message-delivery.md b/docs/message-delivery.md
index 781bfd5..909574f 100644
--- a/docs/message-delivery.md
+++ b/docs/message-delivery.md
@@ -3,16 +3,12 @@ layout: global
 title: Reliable Message Delivery
 ---
 
-
-
 ## What is At Least Once Message Delivery?
 
-Messages could be lost on delivery due to network partitions. **At Least Once Message Delivery** (at least once) means the lost messages are delivered one or more times such that at least one is processed and acked by the whole flow. 
-
-Gearpump guarantees at least once for any source that is able to replay message from a past timestamp. In Gearpump, each message is tagged with a timestamp, and the system tracks the minimum timestamp of all pending messages (the global minimum clock). On message loss, application will be restarted to the global minimum clock. Since the source is able to replay from the gloabl minimum clock, all pending messages before the restart will be replayed. Gearpump calls that kind of source `TimeReplayableSource` and already provides a built in
-[KafkaSource](gearpump-internals.html#at-least-once-message-delivery-and-kafka). With the KafkaSource to ingest data into Gearpump, users are guaranteed at least once message delievery.
-
+Messages could be lost on delivery due to network partitions. **At Least Once Message Delivery** (at least once) means the lost messages are delivered one or more times such that at least one is processed and acknowledged by the whole flow. 
 
+Gearpump guarantees at least once for any source that is able to replay message from a past timestamp. In Gearpump, each message is tagged with a timestamp, and the system tracks the minimum timestamp of all pending messages (the global minimum clock). On message loss, application will be restarted to the global minimum clock. Since the source is able to replay from the global minimum clock, all pending messages before the restart will be replayed. Gearpump calls that kind of source `TimeReplayableSource` and already provides a built in
+[KafkaSource](gearpump-internals.html#at-least-once-message-delivery-and-kafka). With the KafkaSource to ingest data into Gearpump, users are guaranteed at least once message delivery.
 
 ## What is Exactly Once Message Delivery?
 
@@ -24,7 +20,6 @@ Users are guaranteed exactly once in Gearpump if they use both a `TimeReplayable
 ### Persistent API
 Persistent API consists of `PersistentTask` and `PersistentState`.
 
-
 Here is an example of using them to keep count of incoming messages.
 
 ```scala

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistShellAppMaster.scala
----------------------------------------------------------------------
diff --git a/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistShellAppMaster.scala b/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistShellAppMaster.scala
index 8533a03..9b417c0 100644
--- a/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistShellAppMaster.scala
+++ b/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistShellAppMaster.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,20 +17,23 @@
  */
 package io.gearpump.examples.distributedshell
 
+import scala.concurrent.Future
+
 import akka.actor.{Deploy, Props}
 import akka.pattern.{ask, pipe}
 import akka.remote.RemoteScope
 import com.typesafe.config.Config
+import org.slf4j.Logger
+
 import io.gearpump.cluster.ClientToMaster.ShutdownApplication
 import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, ExecutorSystemStarted, StartExecutorSystemTimeout}
 import io.gearpump.cluster.{AppDescription, AppMasterContext, ApplicationMaster, ExecutorContext}
-import DistShellAppMaster._
+import io.gearpump.examples.distributedshell.DistShellAppMaster._
 import io.gearpump.util.{ActorUtil, Constants, LogUtil, Util}
-import org.slf4j.Logger
 
-import scala.concurrent.Future
+class DistShellAppMaster(appContext: AppMasterContext, app: AppDescription)
+  extends ApplicationMaster {
 
-class DistShellAppMaster(appContext : AppMasterContext, app : AppDescription) extends ApplicationMaster {
   import appContext._
   import context.dispatcher
   implicit val timeout = Constants.FUTURE_TIMEOUT
@@ -45,10 +48,11 @@ class DistShellAppMaster(appContext : AppMasterContext, app : AppDescription) ex
   override def receive: Receive = {
     case ExecutorSystemStarted(executorSystem, _) =>
       import executorSystem.{address, resource => executorResource, worker}
-      val executorContext = ExecutorContext(currentExecutorId, worker, appId, app.name, self, executorResource)
-      //start executor
+      val executorContext = ExecutorContext(currentExecutorId, worker, appId, app.name,
+        self, executorResource)
+      // Start executor
       val executor = context.actorOf(Props(classOf[ShellExecutor], executorContext, app.userConfig)
-          .withDeploy(Deploy(scope = RemoteScope(address))), currentExecutorId.toString)
+        .withDeploy(Deploy(scope = RemoteScope(address))), currentExecutorId.toString)
       executorSystem.bindLifeCycleWith(executor)
       currentExecutorId += 1
     case StartExecutorSystemTimeout =>
@@ -56,14 +60,17 @@ class DistShellAppMaster(appContext : AppMasterContext, app : AppDescription) ex
       masterProxy ! ShutdownApplication(appId)
       context.stop(self)
     case msg: ShellCommand =>
-      Future.fold(context.children.map(_ ? msg))(new ShellCommandResultAggregator) { (aggregator, response) =>
-        aggregator.aggregate(response.asInstanceOf[ShellCommandResult])
+      Future.fold(context.children.map(_ ? msg))(new ShellCommandResultAggregator) {
+        (aggregator, response) => {
+          aggregator.aggregate(response.asInstanceOf[ShellCommandResult])
+        }
       }.map(_.toString()) pipeTo sender
   }
 
   private def getExecutorJvmConfig: ExecutorSystemJvmConfig = {
     val config: Config = app.clusterConfig
-    val jvmSetting = Util.resolveJvmSetting(config.withFallback(context.system.settings.config)).executor
+    val jvmSetting = Util.resolveJvmSetting(config.withFallback(context.system.settings.config))
+      .executor
     ExecutorSystemJvmConfig(jvmSetting.classPath, jvmSetting.vmargs,
       appJar, username, config)
   }
@@ -83,6 +90,6 @@ object DistShellAppMaster {
       this
     }
 
-    override def toString() = result.toString()
+    override def toString(): String = result.toString()
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistributedShell.scala
----------------------------------------------------------------------
diff --git a/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistributedShell.scala b/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistributedShell.scala
index ac47740..b7b3eb0 100644
--- a/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistributedShell.scala
+++ b/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistributedShell.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,12 +17,14 @@
  */
 package io.gearpump.examples.distributedshell
 
-import io.gearpump.cluster.{Application, UserConfig, AppDescription}
+import org.slf4j.Logger
+
 import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.main.{ParseResult, CLIOption, ArgumentsParser}
+import io.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import io.gearpump.cluster.{Application, UserConfig}
 import io.gearpump.util.{AkkaApp, LogUtil}
-import org.slf4j.Logger
 
+/** Demo application to distribute and execute shell command on all machines of the cluster */
 object DistributedShell extends AkkaApp with ArgumentsParser {
   private val LOG: Logger = LogUtil.getLogger(getClass)
 
@@ -31,7 +33,8 @@ object DistributedShell extends AkkaApp with ArgumentsParser {
   override def main(akkaConf: Config, args: Array[String]): Unit = {
     LOG.info(s"Distributed shell submitting application...")
     val context = ClientContext(akkaConf)
-    val appId = context.submit(Application[DistShellAppMaster]("DistributedShell", UserConfig.empty))
+    val appId = context.submit(Application[DistShellAppMaster]("DistributedShell",
+    UserConfig.empty))
     context.close()
     LOG.info(s"Distributed Shell Application started with appId $appId !")
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistributedShellClient.scala
----------------------------------------------------------------------
diff --git a/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistributedShellClient.scala b/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistributedShellClient.scala
index feecd18..cd6f943 100644
--- a/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistributedShellClient.scala
+++ b/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistributedShellClient.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,19 +18,19 @@
 package io.gearpump.examples.distributedshell
 
 import java.util.concurrent.TimeUnit
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+import akka.pattern.ask
+import org.slf4j.{Logger, LoggerFactory}
 
 import io.gearpump.cluster.client.ClientContext
 import io.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-import DistShellAppMaster.ShellCommand
-
-import akka.pattern.ask
+import io.gearpump.examples.distributedshell.DistShellAppMaster.ShellCommand
 import io.gearpump.util.{AkkaApp, Constants}
-import org.slf4j.{LoggerFactory, Logger}
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
 
-object DistributedShellClient extends AkkaApp with ArgumentsParser  {
+/** Client to DistributedShell to input "shell command" */
+object DistributedShellClient extends AkkaApp with ArgumentsParser {
   implicit val timeout = Constants.FUTURE_TIMEOUT
   private val LOG: Logger = LoggerFactory.getLogger(getClass)
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/ShellExecutor.scala
----------------------------------------------------------------------
diff --git a/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/ShellExecutor.scala b/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/ShellExecutor.scala
index 712788c..2d0fd06 100644
--- a/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/ShellExecutor.scala
+++ b/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/ShellExecutor.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,16 +18,18 @@
 
 package io.gearpump.examples.distributedshell
 
+import scala.sys.process._
+import scala.util.{Failure, Success, Try}
+
 import akka.actor.Actor
-import io.gearpump.cluster.{UserConfig, ExecutorContext}
-import DistShellAppMaster.{ShellCommandResult, ShellCommand}
-import io.gearpump.util.LogUtil
 import org.slf4j.Logger
 
-import scala.util.{Failure, Success, Try}
-import sys.process._
+import io.gearpump.cluster.{ExecutorContext, UserConfig}
+import io.gearpump.examples.distributedshell.DistShellAppMaster.{ShellCommand, ShellCommandResult}
+import io.gearpump.util.LogUtil
 
-class ShellExecutor(executorContext: ExecutorContext, userConf : UserConfig) extends Actor{
+/** Executor actor on remote machine */
+class ShellExecutor(executorContext: ExecutorContext, userConf: UserConfig) extends Actor {
   import executorContext._
   private val LOG: Logger = LogUtil.getLogger(getClass, executor = executorId, app = appId)
 
@@ -35,7 +37,7 @@ class ShellExecutor(executorContext: ExecutorContext, userConf : UserConfig) ext
 
   override def receive: Receive = {
     case ShellCommand(command) =>
-      val process = Try(s"$command" !!)
+      val process = Try(s"$command".!!)
       val result = process match {
         case Success(msg) => msg
         case Failure(ex) => ex.getMessage

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala
----------------------------------------------------------------------
diff --git a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala b/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala
index ec6bba1..2d63734 100644
--- a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala
+++ b/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,20 +17,24 @@
  */
 package io.gearpump.examples.distributedshell
 
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
 import akka.actor.ActorSystem
 import akka.testkit.{TestActorRef, TestProbe}
-import io.gearpump.WorkerId
-import io.gearpump.cluster.AppMasterToMaster.{RequestResource, GetAllWorkers, RegisterAppMaster}
+import org.scalatest.{BeforeAndAfter, Matchers, WordSpec}
+
+import io.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, RegisterAppMaster, RequestResource}
 import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor
-import io.gearpump.cluster.MasterToAppMaster.{ResourceAllocated, WorkerList, AppMasterRegistered}
+import io.gearpump.cluster.MasterToAppMaster.{AppMasterRegistered, ResourceAllocated, WorkerList}
 import io.gearpump.cluster._
-import io.gearpump.cluster.appmaster.{AppMasterRuntimeInfo, AppMasterRuntimeEnvironment}
-import io.gearpump.cluster.scheduler.{ResourceAllocation, Relaxation, ResourceRequest, Resource}
+import io.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, AppMasterRuntimeInfo}
+import io.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceAllocation, ResourceRequest}
+import io.gearpump.cluster.worker.WorkerId
 import io.gearpump.util.ActorSystemBooter.RegisterActorSystem
 import io.gearpump.util.ActorUtil
-import org.scalatest.{BeforeAndAfter, Matchers, WordSpec}
 
-class DistShellAppMasterSpec extends WordSpec with Matchers with BeforeAndAfter{
+class DistShellAppMasterSpec extends WordSpec with Matchers with BeforeAndAfter {
   implicit val system = ActorSystem("AppMasterSpec", TestUtil.DEFAULT_CONFIG)
   val mockMaster = TestProbe()(system)
   val mockWorker1 = TestProbe()(system)
@@ -46,27 +50,29 @@ class DistShellAppMasterSpec extends WordSpec with Matchers with BeforeAndAfter{
   "DistributedShell AppMaster" should {
     "launch one ShellTask on each worker" in {
       val appMasterInfo = AppMasterRuntimeInfo(appId, appName = appId.toString)
-      val appMasterContext = AppMasterContext(appId, userName, resource, null, appJar, masterProxy, appMasterInfo)
+      val appMasterContext = AppMasterContext(appId, userName, resource, null, appJar,
+        masterProxy, appMasterInfo)
       TestActorRef[DistShellAppMaster](
         AppMasterRuntimeEnvironment.props(List(masterProxy.path), appDescription, appMasterContext))
       mockMaster.expectMsgType[RegisterAppMaster]
       mockMaster.reply(AppMasterRegistered(appId))
-      //The DistributedShell AppMaster will ask for worker list
+      // The DistributedShell AppMaster asks for worker list from Master.
       mockMaster.expectMsg(GetAllWorkers)
       mockMaster.reply(WorkerList(workerList))
-      //After worker list is ready, DistributedShell AppMaster will request resouce on each worker
+      // After worker list is ready, DistributedShell AppMaster requests resource on each worker
       workerList.foreach { workerId =>
-        mockMaster.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), workerId, relaxation = Relaxation.SPECIFICWORKER)))
+        mockMaster.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), workerId,
+          relaxation = Relaxation.SPECIFICWORKER)))
       }
-      mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(resource, mockWorker1.ref, WorkerId(1, 0L)))))
+      mockMaster.reply(ResourceAllocated(
+        Array(ResourceAllocation(resource, mockWorker1.ref, WorkerId(1, 0L)))))
       mockWorker1.expectMsgClass(classOf[LaunchExecutor])
       mockWorker1.reply(RegisterActorSystem(ActorUtil.getSystemAddress(system).toString))
     }
   }
 
   after {
-    system.shutdown()
-    system.awaitTermination()
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistributedShellClientSpec.scala
----------------------------------------------------------------------
diff --git a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistributedShellClientSpec.scala b/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistributedShellClientSpec.scala
index adefa40..973b3b3 100644
--- a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistributedShellClientSpec.scala
+++ b/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistributedShellClientSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,19 +17,20 @@
  */
 package io.gearpump.examples.distributedshell
 
+import scala.concurrent.Future
+import scala.util.{Success, Try}
+
 import akka.testkit.TestProbe
-import io.gearpump.cluster.ClientToMaster.ResolveAppId
-import io.gearpump.cluster.MasterToClient.ResolveAppIdResult
-import io.gearpump.cluster.{TestUtil, MasterHarness}
-import DistShellAppMaster.ShellCommand
-import io.gearpump.util.{LogUtil, Constants, Util}
 import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
 
-import scala.concurrent.Future
-
-import scala.util.{Try, Success}
+import io.gearpump.cluster.ClientToMaster.ResolveAppId
+import io.gearpump.cluster.MasterToClient.ResolveAppIdResult
+import io.gearpump.cluster.{MasterHarness, TestUtil}
+import io.gearpump.examples.distributedshell.DistShellAppMaster.ShellCommand
+import io.gearpump.util.LogUtil
 
-class DistributedShellClientSpec extends PropSpec with Matchers with BeforeAndAfter with MasterHarness {
+class DistributedShellClientSpec
+  extends PropSpec with Matchers with BeforeAndAfter with MasterHarness {
 
   private val LOG = LogUtil.getLogger(getClass)
 
@@ -41,17 +42,19 @@ class DistributedShellClientSpec extends PropSpec with Matchers with BeforeAndAf
     shutdownActorSystem()
   }
 
-  override def config = TestUtil.DEFAULT_CONFIG
+  protected override def config = TestUtil.DEFAULT_CONFIG
 
   property("DistributedShellClient should succeed to submit application with required arguments") {
     val command = "ls /"
     val requiredArgs = Array("-appid", "0", "-command", command)
     val masterReceiver = createMockMaster()
 
-    assert(Try(DistributedShellClient.main(Array.empty[String])).isFailure, "missing required arguments, print usage")
-
+    assert(Try(DistributedShellClient.main(Array.empty[String])).isFailure,
+      "missing required arguments, print usage")
 
-    Future {DistributedShellClient.main(masterConfig, requiredArgs)}
+    Future {
+      DistributedShellClient.main(masterConfig, requiredArgs)
+    }
 
     masterReceiver.expectMsg(PROCESS_BOOT_TIME, ResolveAppId(0))
     val mockAppMaster = TestProbe()(getActorSystem)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistributedShellSpec.scala
----------------------------------------------------------------------
diff --git a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistributedShellSpec.scala b/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistributedShellSpec.scala
index 2a369c4..6eeba58 100644
--- a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistributedShellSpec.scala
+++ b/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistributedShellSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,19 +17,19 @@
  */
 package io.gearpump.examples.distributedshell
 
+import scala.concurrent.Future
+import scala.util.Success
+
 import com.typesafe.config.Config
-import io.gearpump.cluster.ClientToMaster.SubmitApplication
-import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
-import io.gearpump.cluster.{TestUtil, MasterHarness}
-import io.gearpump.util.Util
-import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
 import org.scalatest.prop.PropertyChecks
+import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
 
-import scala.util.{Try, Success}
-
-import scala.concurrent.Future
+import io.gearpump.cluster.ClientToMaster.SubmitApplication
+import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
+import io.gearpump.cluster.{MasterHarness, TestUtil}
 
-class DistributedShellSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness {
+class DistributedShellSpec
+  extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness {
 
   before {
     startActorSystem()
@@ -46,7 +46,9 @@ class DistributedShellSpec extends PropSpec with PropertyChecks with Matchers wi
 
     val masterReceiver = createMockMaster()
 
-    Future{DistributedShell.main(masterConfig, requiredArgs)}
+    Future {
+      DistributedShell.main(masterConfig, requiredArgs)
+    }
 
     masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
     masterReceiver.reply(SubmitApplicationResult(Success(0)))

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellCommandResultAggregatorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellCommandResultAggregatorSpec.scala b/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellCommandResultAggregatorSpec.scala
index 6bb1105..d59981b 100644
--- a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellCommandResultAggregatorSpec.scala
+++ b/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellCommandResultAggregatorSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,8 +18,8 @@
 package io.gearpump.examples.distributedshell
 
 import org.scalatest.{BeforeAndAfter, Matchers, WordSpec}
-import DistShellAppMaster.{ShellCommandResultAggregator, ShellCommandResult}
 
+import io.gearpump.examples.distributedshell.DistShellAppMaster.{ShellCommandResult, ShellCommandResultAggregator}
 
 class ShellCommandResultAggregatorSpec extends WordSpec with Matchers with BeforeAndAfter {
   "ShellCommandResultAggregator" should {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellExecutorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellExecutorSpec.scala b/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellExecutorSpec.scala
index d51880b..b301973 100644
--- a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellExecutorSpec.scala
+++ b/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellExecutorSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,18 +17,20 @@
  */
 package io.gearpump.examples.distributedshell
 
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+import scala.sys.process._
+import scala.util.{Failure, Success, Try}
+
 import akka.actor.{ActorSystem, Props}
 import akka.testkit.TestProbe
-import io.gearpump.WorkerId
-import io.gearpump.examples.distributedshell.DistShellAppMaster.ShellCommandResult
+import org.scalatest.{Matchers, WordSpec}
+
 import io.gearpump.cluster.appmaster.WorkerInfo
 import io.gearpump.cluster.scheduler.Resource
+import io.gearpump.cluster.worker.WorkerId
 import io.gearpump.cluster.{ExecutorContext, TestUtil, UserConfig}
-import DistShellAppMaster.{ShellCommand, ShellCommandResult}
-import org.scalatest.{Matchers, WordSpec}
-
-import scala.sys.process._
-import scala.util.{Failure, Success, Try}
+import io.gearpump.examples.distributedshell.DistShellAppMaster.{ShellCommand, ShellCommandResult}
 
 class ShellExecutorSpec extends WordSpec with Matchers {
 
@@ -43,10 +45,12 @@ class ShellExecutorSpec extends WordSpec with Matchers {
       val mockMaster = TestProbe()(system)
       val worker = TestProbe()
       val workerInfo = WorkerInfo(workerId, worker.ref)
-      val executorContext = ExecutorContext(executorId, workerInfo, appId, appName, mockMaster.ref, resource)
-      val executor = system.actorOf(Props(classOf[ShellExecutor], executorContext, UserConfig.empty))
+      val executorContext = ExecutorContext(executorId, workerInfo, appId, appName,
+        mockMaster.ref, resource)
+      val executor = system.actorOf(Props(classOf[ShellExecutor], executorContext,
+        UserConfig.empty))
 
-      val process = Try(s"ls /" !!)
+      val process = Try(s"ls /".!!)
       val result = process match {
         case Success(msg) => msg
         case Failure(ex) => ex.getMessage
@@ -55,8 +59,8 @@ class ShellExecutorSpec extends WordSpec with Matchers {
       assert(mockMaster.receiveN(1).head.asInstanceOf[ShellCommandResult].equals(
         ShellCommandResult(executorId, result)))
 
-      system.shutdown()
-      system.awaitTermination()
+      system.terminate()
+      Await.result(system.whenTerminated, Duration.Inf)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/distributeservice/README.md
----------------------------------------------------------------------
diff --git a/examples/distributeservice/README.md b/examples/distributeservice/README.md
index da5acac..82b3726 100644
--- a/examples/distributeservice/README.md
+++ b/examples/distributeservice/README.md
@@ -15,7 +15,7 @@ In order to run the example:
   -script ${Script_Path} -serviceName ${Service_Name} -target ${Target_Path} -Dkey1=value1 -Dkey2=value2
   ```<br>
   This command will distribute the service zip file(variable ```file```) to the target path(variable ```target```), then copy the script to
-  ```/etc/init.d``` on each machine and install this servcie named with ```serviceName```<br>
+  ```/etc/init.d``` on each machine and install this service named with ```serviceName```<br>
   Note that you can pass some variables when the script file is installed, for example, you can submit a script template with syntax like
   ```
   role=${${hostname}.role}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistServiceAppMaster.scala
----------------------------------------------------------------------
diff --git a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistServiceAppMaster.scala b/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistServiceAppMaster.scala
index 4d3492a..a220dc6 100644
--- a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistServiceAppMaster.scala
+++ b/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistServiceAppMaster.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,21 +18,22 @@
 package io.gearpump.experiments.distributeservice
 
 import java.io.File
+import scala.concurrent.Future
 
 import akka.actor.{Deploy, Props}
 import akka.pattern.{ask, pipe}
 import akka.remote.RemoteScope
 import com.typesafe.config.Config
+import org.slf4j.Logger
+
 import io.gearpump.cluster.ClientToMaster.ShutdownApplication
 import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, ExecutorSystemStarted, StartExecutorSystemTimeout}
 import io.gearpump.cluster.{AppDescription, AppMasterContext, ApplicationMaster, ExecutorContext}
-import DistServiceAppMaster.{FileContainer, GetFileContainer, InstallService}
+import io.gearpump.experiments.distributeservice.DistServiceAppMaster.{FileContainer, GetFileContainer, InstallService}
 import io.gearpump.util._
-import org.slf4j.Logger
-
-import scala.concurrent.Future
 
-class DistServiceAppMaster(appContext : AppMasterContext, app : AppDescription) extends ApplicationMaster {
+class DistServiceAppMaster(appContext: AppMasterContext, app: AppDescription)
+  extends ApplicationMaster {
   import appContext._
   import context.dispatcher
   implicit val timeout = Constants.FUTURE_TIMEOUT
@@ -42,7 +43,7 @@ class DistServiceAppMaster(appContext : AppMasterContext, app : AppDescription)
 
   val rootDirectory = new File("/")
   val host = context.system.settings.config.getString(Constants.GEARPUMP_HOSTNAME)
-  val server = context.actorOf(Props(classOf[FileServer], rootDirectory, host , 0))
+  val server = context.actorOf(Props(classOf[FileServer], rootDirectory, host, 0))
 
   override def preStart(): Unit = {
     LOG.info(s"Distribute Service AppMaster started")
@@ -54,10 +55,12 @@ class DistServiceAppMaster(appContext : AppMasterContext, app : AppDescription)
   override def receive: Receive = {
     case ExecutorSystemStarted(executorSystem, _) =>
       import executorSystem.{address, resource => executorResource, worker}
-      val executorContext = ExecutorContext(currentExecutorId, worker, appId, app.name, self, executorResource)
-      //start executor
-      val executor = context.actorOf(Props(classOf[DistServiceExecutor], executorContext, app.userConfig)
-        .withDeploy(Deploy(scope = RemoteScope(address))), currentExecutorId.toString)
+      val executorContext = ExecutorContext(currentExecutorId, worker,
+        appId, app.name, self, executorResource)
+      // start executor
+      val executor = context.actorOf(Props(classOf[DistServiceExecutor],
+        executorContext, app.userConfig).withDeploy(
+        Deploy(scope = RemoteScope(address))), currentExecutorId.toString)
       executorSystem.bindLifeCycleWith(executor)
       currentExecutorId += 1
     case StartExecutorSystemTimeout =>
@@ -75,7 +78,8 @@ class DistServiceAppMaster(appContext : AppMasterContext, app : AppDescription)
 
   private def getExecutorJvmConfig: ExecutorSystemJvmConfig = {
     val config: Config = app.clusterConfig
-    val jvmSetting = Util.resolveJvmSetting(config.withFallback(context.system.settings.config)).executor
+    val jvmSetting = Util.resolveJvmSetting(
+      config.withFallback(context.system.settings.config)).executor
     ExecutorSystemJvmConfig(jvmSetting.classPath, jvmSetting.vmargs,
       appJar, username, config)
   }
@@ -87,10 +91,10 @@ object DistServiceAppMaster {
   case class FileContainer(url: String)
 
   case class InstallService(
-    url: String,
-    zipFileName: String,
-    targetPath: String,
-    script : Array[Byte],
-    serviceName: String,
-    serviceSettings: Map[String, Any])
+      url: String,
+      zipFileName: String,
+      targetPath: String,
+      script: Array[Byte],
+      serviceName: String,
+      serviceSettings: Map[String, Any])
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistServiceExecutor.scala
----------------------------------------------------------------------
diff --git a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistServiceExecutor.scala b/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistServiceExecutor.scala
index 7fc2a94..4a2a876 100644
--- a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistServiceExecutor.scala
+++ b/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistServiceExecutor.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,29 +17,30 @@
  */
 package io.gearpump.experiments.distributeservice
 
-import java.io.{FileWriter, File}
+import java.io.{File, FileWriter}
 import java.net.InetAddress
+import scala.collection.JavaConverters._
+import scala.io.Source
+import scala.sys.process._
+import scala.util.{Failure, Success, Try}
 
 import akka.actor.Actor
 import org.apache.commons.io.FileUtils
 import org.apache.commons.lang.text.StrSubstitutor
-import io.gearpump.cluster.{UserConfig, ExecutorContext}
-import DistServiceAppMaster.InstallService
-import io.gearpump.util.{ActorUtil, LogUtil}
 import org.slf4j.Logger
 
-import scala.io.Source
-import scala.sys.process._
-import collection.JavaConversions._
-import scala.util.{Failure, Success, Try}
+import io.gearpump.cluster.{ExecutorContext, UserConfig}
+import io.gearpump.experiments.distributeservice.DistServiceAppMaster.InstallService
+import io.gearpump.util.{ActorUtil, LogUtil}
 
-class DistServiceExecutor(executorContext: ExecutorContext, userConf : UserConfig) extends Actor {
+class DistServiceExecutor(executorContext: ExecutorContext, userConf: UserConfig) extends Actor {
   import executorContext._
   private val LOG: Logger = LogUtil.getLogger(getClass, executor = executorId, app = appId)
 
   override def receive: Receive = {
     case InstallService(url, zipFileName, targetPath, scriptData, serviceName, serviceSettings) =>
-      LOG.info(s"Executor $executorId receive command to install service $serviceName to $targetPath")
+      LOG.info(s"Executor $executorId receive command to install " +
+        s"service $serviceName to $targetPath")
       unzipFile(url, zipFileName, targetPath)
       installService(scriptData, serviceName, serviceSettings)
   }
@@ -47,31 +48,32 @@ class DistServiceExecutor(executorContext: ExecutorContext, userConf : UserConfi
   private def unzipFile(url: String, zipFileName: String, targetPath: String) = {
     val zipFile = File.createTempFile(System.currentTimeMillis().toString, zipFileName)
     val dir = new File(targetPath)
-    if(dir.exists()) {
+    if (dir.exists()) {
       FileUtils.forceDelete(dir)
     }
     val bytes = FileServer.newClient.get(url).get
     FileUtils.writeByteArrayToFile(zipFile, bytes)
-    val result = Try(s"unzip ${zipFile.getAbsolutePath} -d $targetPath" !!)
+    val result = Try(s"unzip ${zipFile.getAbsolutePath} -d $targetPath".!!)
     result match {
       case Success(msg) => LOG.info(s"Executor $executorId unzip file to $targetPath")
-      case Failure(ex) =>  throw ex
+      case Failure(ex) => throw ex
     }
   }
 
-  private def installService(scriptData: Array[Byte], serviceName: String, serviceSettings: Map[String, Any]) = {
+  private def installService(
+      scriptData: Array[Byte], serviceName: String, serviceSettings: Map[String, Any]) = {
     val tempFile = File.createTempFile("gearpump", serviceName)
     FileUtils.writeByteArrayToFile(tempFile, scriptData)
     val script = new File("/etc/init.d", serviceName)
     writeFileWithEnvVariables(tempFile, script, serviceSettings ++ getEnvSettings)
-    val result = Try(s"chkconfig --add $serviceName" !!)
+    val result = Try(s"chkconfig --add $serviceName".!!)
     result match {
       case Success(msg) => LOG.info(s"Executor install service $serviceName successfully!")
       case Failure(ex) => throw ex
     }
   }
 
-  private def getEnvSettings : Map[String, Any] = {
+  private def getEnvSettings: Map[String, Any] = {
     Map("workerId" -> worker,
       "localhost" -> ActorUtil.getSystemAddress(context.system).host.get,
       "hostname" -> InetAddress.getLocalHost.getHostName)
@@ -79,7 +81,7 @@ class DistServiceExecutor(executorContext: ExecutorContext, userConf : UserConfi
 
   private def writeFileWithEnvVariables(source: File, target: File, envs: Map[String, Any]) = {
     val writer = new FileWriter(target)
-    val sub = new StrSubstitutor(mapAsJavaMap(envs))
+    val sub = new StrSubstitutor(envs.asJava)
     sub.setEnableSubstitutionInVariables(true)
     Source.fromFile(source).getLines().foreach(line => writer.write(sub.replace(line) + "\r\n"))
     writer.close()

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistributeService.scala
----------------------------------------------------------------------
diff --git a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistributeService.scala b/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistributeService.scala
index d558b5d..522dc5e 100644
--- a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistributeService.scala
+++ b/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistributeService.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,13 +17,15 @@
  */
 package io.gearpump.experiments.distributeservice
 
+import org.slf4j.Logger
+
 import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.{Application, AppJar, UserConfig, AppDescription}
-import io.gearpump.cluster.main.{ParseResult, CLIOption, ArgumentsParser}
+import io.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import io.gearpump.cluster.{Application, UserConfig}
 import io.gearpump.util.{AkkaApp, LogUtil}
-import org.slf4j.Logger
 
-object DistributeService extends AkkaApp with ArgumentsParser  {
+/** Demo app to remotely deploy and start system service on machines in the cluster */
+object DistributeService extends AkkaApp with ArgumentsParser {
   private val LOG: Logger = LogUtil.getLogger(getClass)
 
   override val options: Array[(String, CLIOption[Any])] = Array.empty
@@ -31,7 +33,8 @@ object DistributeService extends AkkaApp with ArgumentsParser  {
   override def main(akkaConf: Config, args: Array[String]): Unit = {
     LOG.info(s"Distribute Service submitting application...")
     val context = ClientContext(akkaConf)
-    val appId = context.submit(Application[DistServiceAppMaster]("DistributedService", UserConfig.empty))
+    val appId = context.submit(Application[DistServiceAppMaster]("DistributedService",
+      UserConfig.empty))
     context.close()
     LOG.info(s"Distribute Service Application started with appId $appId !")
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistributeServiceClient.scala
----------------------------------------------------------------------
diff --git a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistributeServiceClient.scala b/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistributeServiceClient.scala
index 5f4ebbb..0d85001 100644
--- a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistributeServiceClient.scala
+++ b/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistributeServiceClient.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,31 +18,35 @@
 package io.gearpump.experiments.distributeservice
 
 import java.io.File
-import org.apache.commons.io.FileUtils
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.main.{CLIOption, ArgumentsParser}
-import DistServiceAppMaster.{InstallService, FileContainer, GetFileContainer}
-import io.gearpump.util.{AkkaApp, LogUtil, FileServer, Constants}
-import org.slf4j.{LoggerFactory, Logger}
-
-import akka.pattern.ask
 import scala.concurrent.Future
 import scala.util.{Failure, Success}
 
-object DistributeServiceClient extends AkkaApp with ArgumentsParser{
+import akka.pattern.ask
+import org.apache.commons.io.FileUtils
+
+import io.gearpump.cluster.client.ClientContext
+import io.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import io.gearpump.experiments.distributeservice.DistServiceAppMaster.{FileContainer, GetFileContainer, InstallService}
+import io.gearpump.util.{AkkaApp, Constants}
+
+/** Client to submit the service jar */
+object DistributeServiceClient extends AkkaApp with ArgumentsParser {
   implicit val timeout = Constants.FUTURE_TIMEOUT
 
   override val options: Array[(String, CLIOption[Any])] = Array(
     "appid" -> CLIOption[Int]("<the distributed shell appid>", required = true),
     "file" -> CLIOption[String]("<service zip file path>", required = true),
-    "script" -> CLIOption[String]("<file path of service script that will be installed to /etc/init.d>", required = true),
+    "script" -> CLIOption[String](
+      "<file path of service script that will be installed to /etc/init.d>", required = true),
     "serviceName" -> CLIOption[String]("<service name>", required = true),
     "target" -> CLIOption[String]("<target path on each machine>", required = true)
   )
 
-  override def help : Unit = {
-    super.help
+  override def help(): Unit = {
+    super.help()
+    // scalastyle:off println
     Console.err.println(s"-D<name>=<value> set a property to the service")
+    // scalastyle:on println
   }
 
   override def main(akkaConf: Config, args: Array[String]): Unit = {
@@ -75,7 +79,7 @@ object DistributeServiceClient extends AkkaApp with ArgumentsParser{
   private def parseServiceConfig(args: Array[String]): Map[String, Any] = {
     val result = Map.empty[String, Any]
     args.foldLeft(result) { (result, argument) =>
-      if(argument.startsWith("-D") && argument.contains("=")) {
+      if (argument.startsWith("-D") && argument.contains("=")) {
         val fixedKV = argument.substring(2).split("=")
         result + (fixedKV(0) -> fixedKV(1))
       } else {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/FileServer.scala
----------------------------------------------------------------------
diff --git a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/FileServer.scala b/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/FileServer.scala
index 893aa19..ed0b24d 100644
--- a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/FileServer.scala
+++ b/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/FileServer.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,21 +16,20 @@
  * limitations under the License.
  */
 
-
 package io.gearpump.experiments.distributeservice
 
 import java.io.File
+import scala.util.{Failure, Success, Try}
 
 import akka.actor.{Actor, Stash}
 import akka.io.IO
-import io.gearpump.util.LogUtil
 import org.apache.commons.httpclient.HttpClient
 import org.apache.commons.httpclient.methods.{ByteArrayRequestEntity, GetMethod, PostMethod}
 import spray.can.Http
 import spray.http.HttpMethods._
 import spray.http._
-import io.gearpump.util.FileUtils
-import scala.util.{Failure, Success, Try}
+
+import io.gearpump.util.{FileUtils, LogUtil}
 
 /**
  *
@@ -38,18 +37,18 @@ import scala.util.{Failure, Success, Try}
  *
  * port: set port to 0 if you want to bind to random port
  */
-class FileServer(rootDir: File, host: String, port : Int) extends Actor with Stash {
+class FileServer(rootDir: File, host: String, port: Int) extends Actor with Stash {
   private val LOG = LogUtil.getLogger(getClass)
 
   implicit val system = context.system
 
   override def preStart(): Unit = {
-    // create http server
+    // Creates http server
     IO(Http) ! Http.Bind(self, host, port)
   }
 
-  override def postStop() : Unit = {
-    //stop the server
+  override def postStop(): Unit = {
+    // Stop the server
     IO(Http) ! Http.Unbind
   }
 
@@ -62,14 +61,14 @@ class FileServer(rootDir: File, host: String, port : Int) extends Actor with Sta
       stash()
   }
 
-  def listen(port : Int) : Receive = {
+  def listen(port: Int): Receive = {
     case FileServer.GetPort => {
       sender ! FileServer.Port(port)
     }
     case Http.Connected(remote, _) =>
       sender ! Http.Register(self)
 
-    // fetch file from remote uri
+    // Fetches files from remote uri
     case HttpRequest(GET, uri, _, _, _) =>
       val child = uri.path.toString()
       val payload = Try {
@@ -83,8 +82,8 @@ class FileServer(rootDir: File, host: String, port : Int) extends Actor with Sta
           LOG.error("failed to get file " + ex.getMessage)
           sender ! HttpResponse(status = StatusCodes.InternalServerError, entity = ex.getMessage)
       }
-    //save file to remote uri
-    case post @ HttpRequest(POST, uri, _, _, _) =>
+    // Save file to remote uri
+    case post@HttpRequest(POST, uri, _, _, _) =>
       val child = uri.path.toString()
 
       val status = Try {
@@ -104,14 +103,14 @@ class FileServer(rootDir: File, host: String, port : Int) extends Actor with Sta
 
 object FileServer {
   object GetPort
-  case class Port(port : Int)
+  case class Port(port: Int)
 
-  def newClient = new Client
+  def newClient: Client = new Client
 
   class Client {
     val client = new HttpClient()
 
-    def save(uri : String, data : Array[Byte]) : Try[Int] = {
+    def save(uri: String, data: Array[Byte]): Try[Int] = {
       Try {
         val post = new PostMethod(uri)
         val entity = new ByteArrayRequestEntity(data)
@@ -120,7 +119,7 @@ object FileServer {
       }
     }
 
-    def get(uri : String) : Try[Array[Byte]] = {
+    def get(uri: String): Try[Array[Byte]] = {
       val get = new GetMethod(uri)
       val status = Try {
         client.executeMethod(get)