You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/04/26 11:42:49 UTC

[39/49] incubator-gearpump git commit: GEARPUMP-11, fix code style

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/main/Stock.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/main/Stock.scala b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/main/Stock.scala
index 91ba36e..638dc4e 100644
--- a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/main/Stock.scala
+++ b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/main/Stock.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,51 +16,60 @@
  * limitations under the License.
  */
 
-
 package io.gearpump.streaming.examples.stock.main
 
 import akka.actor.ActorSystem
-import io.gearpump.streaming.{StreamApplication, Processor}
-import io.gearpump.streaming.examples.stock.{Crawler, QueryServer, Analyzer, StockMarket}
+import org.slf4j.Logger
+
 import io.gearpump.cluster.UserConfig
 import io.gearpump.cluster.client.ClientContext
 import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
 import io.gearpump.partitioner.HashPartitioner
-import StockMarket.ServiceHour
+import io.gearpump.streaming.examples.stock.StockMarket.ServiceHour
+import io.gearpump.streaming.examples.stock.{Analyzer, Crawler, QueryServer, StockMarket}
+import io.gearpump.streaming.{Processor, StreamApplication}
 import io.gearpump.transport.HostPort
 import io.gearpump.util.Graph.Node
 import io.gearpump.util.{AkkaApp, Graph, LogUtil}
-import org.slf4j.Logger
 
+/** Tracks the China's stock market index change */
 object Stock extends AkkaApp with ArgumentsParser {
 
   private val LOG: Logger = LogUtil.getLogger(getClass)
 
   override val options: Array[(String, CLIOption[Any])] = Array(
-    "crawler"-> CLIOption[Int]("<how many fetcher to get data from remote>", required = false, defaultValue = Some(10)),
-    "analyzer"-> CLIOption[Int]("<parallism of analyzer>", required = false, defaultValue = Some(1)),
-    "proxy" -> CLIOption[String]("proxy setting host:port, for example: 127.0.0.1:8443", required = false, defaultValue = Some("")))
+    "crawler" -> CLIOption[Int]("<how many fetcher to get data from remote>",
+      required = false, defaultValue = Some(10)),
+    "analyzer" -> CLIOption[Int]("<parallism of analyzer>",
+      required = false, defaultValue = Some(1)),
+    "proxy" -> CLIOption[String]("proxy setting host:port, for example: 127.0.0.1:8443",
+      required = false, defaultValue = Some("")))
 
-  def crawler(config: ParseResult)(implicit system: ActorSystem) : StreamApplication = {
+  def crawler(config: ParseResult)(implicit system: ActorSystem): StreamApplication = {
     val crawler = Processor[Crawler](config.getInt("crawler"))
     val analyzer = Processor[Analyzer](config.getInt("analyzer"))
     val queryServer = Processor[QueryServer](1)
 
     val proxySetting = config.getString("proxy")
-    val proxy = if (proxySetting.isEmpty) {null } else HostPort(proxySetting)
+    val proxy = if (proxySetting.isEmpty) {
+      null
+    } else HostPort(proxySetting)
     val stockMarket = new StockMarket(new ServiceHour(true), proxy)
     val stocks = stockMarket.getStockIdList
 
+    // scalastyle:off println
     Console.println(s"Successfully fetched stock id for ${stocks.length} stocks")
+    // scalastyle:on println
 
-    val userConfig = UserConfig.empty.withValue("StockId", stocks).withValue[StockMarket](classOf[StockMarket].getName, stockMarket)
+    val userConfig = UserConfig.empty.withValue("StockId", stocks)
+      .withValue[StockMarket](classOf[StockMarket].getName, stockMarket)
     val partitioner = new HashPartitioner
 
     val p1 = crawler ~ partitioner ~> analyzer
     val p2 = Node(queryServer)
     val graph = Graph(p1, p2)
     val app = StreamApplication("stock_direct_analyzer", graph, userConfig
-      )
+    )
     app
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/README.md
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/README.md b/examples/streaming/transport/README.md
new file mode 100644
index 0000000..fc9bdfe
--- /dev/null
+++ b/examples/streaming/transport/README.md
@@ -0,0 +1,3 @@
+What is this?
+=============
+A smart transportation example which simulate a city with millions of cars.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/main/resources/transport/css/custom.css
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/transport/css/custom.css b/examples/streaming/transport/src/main/resources/transport/css/custom.css
index 5b19ac7..f324b6a 100644
--- a/examples/streaming/transport/src/main/resources/transport/css/custom.css
+++ b/examples/streaming/transport/src/main/resources/transport/css/custom.css
@@ -1,16 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 .ui-datepicker {
   font-size: 11px;
-  }
+}
+
 .sidebar-label {
-  font-size:15px;
+  font-size: 15px;
   font-family: calibri, Arial, Helvetica, sans-serif;
 }
-          
+
 .help {
-  font-size:12px;
+  font-size: 12px;
   font-family: calibri, Arial, Helvetica, sans-serif;
 }
-          
+
 div.splitter {
   margin: 12px 0px 7px 0px;
   clear: both;
@@ -18,13 +37,13 @@ div.splitter {
 }
 
 input.sidebar {
-  width:165px
+  width: 165px
 }
 
 select.sidebar {
-  width:198px
+  width: 198px
 }
-          
+
 table.dataintable {
   font-family: calibri, Arial, Helvetica, sans-serif;
   font-size: 15px;
@@ -48,49 +67,49 @@ table.dataintable td {
   border: 1px solid #AAA;
 }
 
-#search{
-  width:100px;
-  height:25px;
-  position:relative;
-  left:0px;
-  top:5px;
+#search {
+  width: 100px;
+  height: 25px;
+  position: relative;
+  left: 0px;
+  top: 5px;
 }
 
-#mytable{
-  width:100%;
-  height:300;
-  float:left;
+#mytable {
+  width: 100%;
+  height: 300;
+  float: left;
 }
 
-#mychart{
-  height:400px;
-  width:100%;
+#mychart {
+  height: 400px;
+  width: 100%;
 }
 
-#Menu{
-  height:100%;
-  width:245px;
-  float:left;
+#Menu {
+  height: 100%;
+  width: 245px;
+  float: left;
 }
 
-#header{
-  height:115px;
+#header {
+  height: 115px;
   background-image: url(header.png);
 }
 
-#body{
-  height:100%;
-  width:100%;
+#body {
+  height: 100%;
+  width: 100%;
   background-image: url(body.png);
-  background-size:100% 100%;
+  background-size: 100% 100%;
 }
 
-#footer{
-  color:white;
-  height:70px;
-  line-height:70px;
-  text-align:middle;
-  clear:both;
-  text-align:center;
+#footer {
+  color: white;
+  height: 70px;
+  line-height: 70px;
+  text-align: middle;
+  clear: both;
+  text-align: center;
   background-image: url(foot.png);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/main/resources/transport/js/transport.js
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/transport/js/transport.js b/examples/streaming/transport/src/main/resources/transport/js/transport.js
index 35d9ea2..eef0fe9 100644
--- a/examples/streaming/transport/src/main/resources/transport/js/transport.js
+++ b/examples/streaming/transport/src/main/resources/transport/js/transport.js
@@ -1,162 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 var myChart = echarts.init(document.getElementById("mychart"))
 
 echarts.util.mapData.params.params.football = {
-    getGeoJson: function (callback) {
-        $.ajax({
-            url: "../svg/beijing.svg",
-            dataType: 'xml',
-            success: function(xml) {
-                callback(xml)
-            }
-        });
-    }
+  getGeoJson: function (callback) {
+    $.ajax({
+      url: "../svg/beijing.svg",
+      dataType: 'xml',
+      success: function (xml) {
+        callback(xml)
+      }
+    });
+  }
 }
 
 function updateRecords(tableId) {
-   $.getJSON( "records", function( json ) {
-     var tableStr = "<table class=\"dataintable\" style=\"margin-left: 5px;\">";
-     tableStr += "<tr><th>Over Speed Vehicle ID</th><th>Speed</th><th>Location</th><th>Time</th></tr>";
-     var records = json.records;
-     for(var i = 0; i < Math.min(records.length, 20); i++) {
-       var record = records[i];
-       var vehicleId = record.vehicleId;
-       var location = record.locationId.split("_");
-       var speed = record.speed;
-       var row = location[1];
-       var column = location[2];
-       var time = new Date(Number(record.timestamp)).toLocaleTimeString().replace(/^\D*/,'');
-       tableStr += "<tr><td>" + vehicleId + "</td>";
-       tableStr += "<td>" + speed + "km/h </td>"
-       tableStr += "<td>(" + row + ", "+ column + ")</td>";
-       tableStr += "<td>" + time + "</td></tr>";
-     }
-     if(records.length < 20) {
-       for(var i = records.length; i < 20; i++) {
+  $.getJSON("records", function (json) {
+      var tableStr = "<table class=\"dataintable\" style=\"margin-left: 5px;\">";
+      tableStr += "<tr><th>Over Speed Vehicle ID</th><th>Speed</th><th>Location</th><th>Time</th></tr>";
+      var records = json.records;
+      for (var i = 0; i < Math.min(records.length, 20); i++) {
+        var record = records[i];
+        var vehicleId = record.vehicleId;
+        var location = record.locationId.split("_");
+        var speed = record.speed;
+        var row = location[1];
+        var column = location[2];
+        var time = new Date(Number(record.timestamp)).toLocaleTimeString().replace(/^\D*/, '');
+        tableStr += "<tr><td>" + vehicleId + "</td>";
+        tableStr += "<td>" + speed + "km/h </td>"
+        tableStr += "<td>(" + row + ", " + column + ")</td>";
+        tableStr += "<td>" + time + "</td></tr>";
+      }
+      if (records.length < 20) {
+        for (var i = records.length; i < 20; i++) {
           tableStr += "<tr><td></td>";
           tableStr += "<td> </td>"
           tableStr += "<td> </td>";
           tableStr += "<td> </td></tr>";
-       }
-     }
-     tableStr += "</table>"
-     document.getElementById(tableId).innerHTML = tableStr;
-   }
-   )
+        }
+      }
+      tableStr += "</table>"
+      document.getElementById(tableId).innerHTML = tableStr;
+    }
+  )
 }
 
 function initChart(chartid, vehicleId) {
-      // 基于准备好的dom,初始化echarts图表
-      $.getJSON( "trace/" + vehicleId, function( json ) {
-        // 为echarts对象加载数据
-        var records = json.records;
-        var timeLine = new Array(records.length);
-        var markPoints = new Array(records.length);
-        var options_ = new Array(records.length - 2);
-        for(var i = 0; i < records.length; i++) {
-          var record = records[i];
-          var vehicleId = record.vehicleId;
-          var location = record.locationId.split("_");
-          var row = location[1];
-          var column = location[2];
-          var time = new Date(Number(record.timeStamp)).toLocaleTimeString().replace(/^\D*/,'');
-          timeLine[i] = time;
-          var currentPonit = {name: "", value: i, geoCoord:[row * 90, column * 90]};
-          markPoints[i] = currentPonit;
+  // 基于准备好的dom,初始化echarts图表
+  $.getJSON("trace/" + vehicleId, function (json) {
+    // 为echarts对象加载数据
+    var records = json.records;
+    var timeLine = new Array(records.length);
+    var markPoints = new Array(records.length);
+    var options_ = new Array(records.length - 2);
+    for (var i = 0; i < records.length; i++) {
+      var record = records[i];
+      var vehicleId = record.vehicleId;
+      var location = record.locationId.split("_");
+      var row = location[1];
+      var column = location[2];
+      var time = new Date(Number(record.timeStamp)).toLocaleTimeString().replace(/^\D*/, '');
+      timeLine[i] = time;
+      var currentPonit = {name: "", value: i, geoCoord: [row * 90, column * 90]};
+      markPoints[i] = currentPonit;
+    }
+    options_[0] =
+    {
+      title: {
+        text: 'Vehicle trace'
+      },
+      tooltip: {
+        trigger: 'item'
+      },
+      toolbox: {
+        show: false,
+        feature: {
+          mark: {show: true},
+          dataView: {show: true, readOnly: false},
+          magicType: {show: true, type: ['line', 'bar']},
+          restore: {show: true},
+          saveAsImage: {show: true}
         }
-        options_[0] =
-          {
-            title : {
-              text: 'Vehicle trace'
-            },
-            tooltip : {
-              trigger: 'item'
-            },
-            toolbox: {
-              show : false,
-              feature : {
-                mark : {show: true},
-                dataView : {show: true, readOnly: false},
-                magicType : {show: true, type: ['line', 'bar']},
-                restore : {show: true},
-                saveAsImage : {show: true}
+      },
+      series: [
+        {
+          name: 'Vehicle trace',
+          type: 'map',
+          mapType: 'football',
+          mapLocation: {
+            y: 30,
+            height: 430
+          },
+          itemStyle: {
+            normal: {label: {show: false}},
+            emphasis: {label: {show: false}}
+          },
+          data: [
+            {name: 'City', hoverable: false, itemStyle: {normal: {label: {show: false}}}}
+          ],
+          markPoint: {
+            symbol: 'circle',
+            symbolSize: 8,
+            itemStyle: {
+              normal: {
+                borderWidth: 1,
+                color: 'blue',
+                lineStyle: {
+                  type: 'solid'
+                }
               }
             },
-            series : [
-              {
-                name: 'Vehicle trace',
-                type: 'map',
-                mapType: 'football',
-                mapLocation:{
-                  y: 30,
-                  height: 430
-                },
-                itemStyle:{
-                  normal:{label:{show:false}},
-                  emphasis:{label:{show:false}}
-                },
-                data:[
-                  {name: 'City', hoverable: false, itemStyle:{normal:{label:{show:false}}}}
-                ],
-                markPoint : {
-                  symbol:'circle',
-                  symbolSize : 8,
-                  itemStyle : {
-                    normal: {
-                      borderWidth:1,
-                      color: 'blue',
-                      lineStyle: {
-                        type: 'solid'
-                      }
-                    }
-                  },
-                  data: [markPoints[0]]
-                },
-                markLine : {
-                  smooth:true,
-                  effect : {
-                    show: true,
-                    scaleSize: 1.5,
-                    period: 1.5,
-                    color: '#fff'
-                  },
-                  itemStyle : {
-                    normal: {
-                      borderWidth:2,
-                      color: 'red',
-                      lineStyle: {
-                        type: 'solid'
-                      }
-                    }
-                  },
-                  data: []
+            data: [markPoints[0]]
+          },
+          markLine: {
+            smooth: true,
+            effect: {
+              show: true,
+              scaleSize: 1.5,
+              period: 1.5,
+              color: '#fff'
+            },
+            itemStyle: {
+              normal: {
+                borderWidth: 2,
+                color: 'red',
+                lineStyle: {
+                  type: 'solid'
                 }
               }
-            ]
+            },
+            data: []
           }
-        for(var i = 1; i < markPoints.length; i++){
-          options_[i] =
+        }
+      ]
+    }
+    for (var i = 1; i < markPoints.length; i++) {
+      options_[i] =
+      {
+        series: [
           {
-            series: [
-            {
-              markPoint : {
-                data: [markPoints[i]]
-              },
-              markLine : {
-                data: []
-              }
+            markPoint: {
+              data: [markPoints[i]]
+            },
+            markLine: {
+              data: []
             }
-            ]
           }
-        }
-        var option = {
-          timeline : {
-            type: 'number',
-            playInterval:500,
-            autoPlay:true,
-            data: timeLine
-          },
-          options: options_
-        };
-        myChart.setOption(option);
-      });
+        ]
+      }
+    }
+    var option = {
+      timeline: {
+        type: 'number',
+        playInterval: 500,
+        autoPlay: true,
+        data: timeLine
+      },
+      options: options_
+    };
+    myChart.setOption(option);
+  });
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/main/resources/transport/transport.html
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/transport/transport.html b/examples/streaming/transport/src/main/resources/transport/transport.html
index fc17707..baee931 100644
--- a/examples/streaming/transport/src/main/resources/transport/transport.html
+++ b/examples/streaming/transport/src/main/resources/transport/transport.html
@@ -1,67 +1,88 @@
 <!DOCTYPE html>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
 <html>
 
 <head>
-    <meta charset="utf-8">
-    <link rel=stylesheet type=text/css href="css/custom.css">
-    <script src="http://echarts.baidu.com/build/source/echarts-all.js"></script>
-    <script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js"></script>
+  <meta charset="utf-8">
+  <link rel=stylesheet type=text/css href="css/custom.css">
+  <script src="http://echarts.baidu.com/build/source/echarts-all.js"></script>
+  <script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js"></script>
 </head>
 
 <body style="background-color:#F2F2F2">
 <div id="container" style="width:882px; height:450px;margin-left:auto;margin-right:auto;">
-    <div style="height:0px"></div>
-    <div id="header">
-        <div style="font-weight:600;position:relative;left:50px;top:50px;font-family: calibri, Arial, Helvetica, sans-serif;font-size:29px;color:white">
-            Big Data Transport Monitoring Demo
-        </div>
+  <div style="height:0px"></div>
+  <div id="header">
+    <div
+      style="font-weight:600;position:relative;left:50px;top:50px;font-family: calibri, Arial, Helvetica, sans-serif;font-size:29px;color:white">
+      Big Data Transport Monitoring Demo
     </div>
-    <div id="body">
-        <div id="Menu">
-            <div style="position:relative;margin-left:30px; margin-right:20px;margin-top:20px;">
-                <!-- form to post to accompany to get accompanying cars -->
+  </div>
+  <div id="body">
+    <div id="Menu">
+      <div style="position:relative;margin-left:30px; margin-right:20px;margin-top:20px;">
+        <!-- form to post to accompany to get accompanying cars -->
 
-                <table style="width:100%">
-                    <tr>
-                        <td class="sidebar-label">Vehicle Id:</td>
-                    </tr>
-                    <tr>
-                        <td class="sidebar-label"></td>
-                    </tr>
-                    <tr>
-                        <td style="vertical-align:top;">
-                            <input id="vehicleId" class="sidebar" type="text" name="vehicleId"/>
-                        </td>
-                    </tr>
-                </table>
-                <div class="splitter"></div>
-                <div>
-                    <button id="search" onclick="search_onclick()">Search</button>
-                </div>
-            </div>
+        <table style="width:100%">
+          <tr>
+            <td class="sidebar-label">Vehicle Id:</td>
+          </tr>
+          <tr>
+            <td class="sidebar-label"></td>
+          </tr>
+          <tr>
+            <td style="vertical-align:top;">
+              <input id="vehicleId" class="sidebar" type="text" name="vehicleId"/>
+            </td>
+          </tr>
+        </table>
+        <div class="splitter"></div>
+        <div>
+          <button id="search" onclick="search_onclick()">Search</button>
         </div>
-        <div id="content" style="height:100%;width:585px;float:left;position:relative;left:20px;overflow:scroll;">
-            <div style="height:50px;position:relative;top:15px;vertical-align:middle;font-weight:300;font-family: calibri, Arial, Helvetica, sans-serif;font-size:22px;color:black">
-                Vehicle Trace:
-            </div>
-            <div style="height:7px;background-color:#92BDF2;"></div>
+      </div>
+    </div>
+    <div id="content"
+         style="height:100%;width:585px;float:left;position:relative;left:20px;overflow:scroll;">
+      <div
+        style="height:50px;position:relative;top:15px;vertical-align:middle;font-weight:300;font-family: calibri, Arial, Helvetica, sans-serif;font-size:22px;color:black">
+        Vehicle Trace:
+      </div>
+      <div style="height:7px;background-color:#92BDF2;"></div>
 
-            <div id="mychart"></div>
+      <div id="mychart"></div>
 
-            <div id="mytable"></div>
-        </div>
+      <div id="mytable"></div>
     </div>
-    <div id="footer">
-        Big Data Team @ Intel
-    </div> 
-    <script src="js/transport.js"></script>
-    <script type="text/javascript">        
-      function search_onclick(){
-        var vehicleId = document.getElementById('vehicleId').value
-        initChart("mychart", vehicleId)
-      }
-      setInterval(updateRecords, 1000, "mytable")
-    </script>   
+  </div>
+  <div id="footer">
+    Big Data Team @ Intel
+  </div>
+  <script src="js/transport.js"></script>
+  <script type="text/javascript">
+    function search_onclick() {
+      var vehicleId = document.getElementById('vehicleId').value
+      initChart("mychart", vehicleId)
+    }
+    setInterval(updateRecords, 1000, "mytable")
+  </script>
 </div>
 </body>
 </html>

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Data.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Data.scala b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Data.scala
index 7d3f35a..788f92a 100644
--- a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Data.scala
+++ b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Data.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,9 +19,11 @@ package io.gearpump.streaming.examples.transport
 
 case class LocationInfo(id: String, row: Int, column: Int)
 
+// scalastyle:off equals.hash.code
 case class PassRecord(vehicleId: String, locationId: String, timeStamp: Long) {
   override def hashCode: Int = vehicleId.hashCode
 }
+// scalastyle:on equals.hash.code
 
 case class GetTrace(vehicleId: String)
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/DataSource.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/DataSource.scala b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/DataSource.scala
index 1c280f1..33d7b54 100644
--- a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/DataSource.scala
+++ b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/DataSource.scala
@@ -1,55 +1,56 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.transport
-
-import io.gearpump.streaming.examples.transport.generator.{MockCity, PassRecordGenerator}
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import scala.concurrent.duration._
-
-class DataSource(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf){
-  import taskContext.{output, parallelism, taskId, scheduleOnce}
-
-  import system.dispatcher
-  private val overdriveThreshold = conf.getInt(VelocityInspector.OVER_DRIVE_THRESHOLD).get
-  private val vehicleNum = conf.getInt(DataSource.VEHICLE_NUM).get / parallelism
-  private val citySize = conf.getInt(DataSource.MOCK_CITY_SIZE).get
-  private val mockCity = new MockCity(citySize)
-  private val recordGenerators: Array[PassRecordGenerator] = 
-    PassRecordGenerator.create(vehicleNum, getIdentifier(taskId), mockCity, overdriveThreshold)
-  
-  override def onStart(startTime: StartTime): Unit = {
-    self ! Message("start", System.currentTimeMillis())
-  }
-
-  override def onNext(msg: Message): Unit = {
-    recordGenerators.foreach(generator => 
-      output(Message(generator.getNextPassRecord(), System.currentTimeMillis())))
-    scheduleOnce(1 second)(self ! Message("continue", System.currentTimeMillis()))
-  }
-  
-  private def getIdentifier(taskId: TaskId): String = {
-    s"沪A${taskId.processorId}${taskId.index}"
-  }
-}
-
-object DataSource {
-  final val VEHICLE_NUM = "vehicle.number"
-  final val MOCK_CITY_SIZE = "mock.city.size"
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.gearpump.streaming.examples.transport
+
+import scala.concurrent.duration._
+
+import io.gearpump.Message
+import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.examples.transport.generator.{MockCity, PassRecordGenerator}
+import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
+
+class DataSource(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
+  import taskContext.{output, parallelism, scheduleOnce, taskId}
+  private val overdriveThreshold = conf.getInt(VelocityInspector.OVER_DRIVE_THRESHOLD).get
+  private val vehicleNum = conf.getInt(DataSource.VEHICLE_NUM).get / parallelism
+  private val citySize = conf.getInt(DataSource.MOCK_CITY_SIZE).get
+  private val mockCity = new MockCity(citySize)
+  private val recordGenerators: Array[PassRecordGenerator] =
+    PassRecordGenerator.create(vehicleNum, getIdentifier(taskId), mockCity, overdriveThreshold)
+
+  override def onStart(startTime: StartTime): Unit = {
+    self ! Message("start", System.currentTimeMillis())
+  }
+
+  override def onNext(msg: Message): Unit = {
+    recordGenerators.foreach(generator =>
+      output(Message(generator.getNextPassRecord(), System.currentTimeMillis())))
+    scheduleOnce(1.second)(self ! Message("continue", System.currentTimeMillis()))
+  }
+
+  private def getIdentifier(taskId: TaskId): String = {
+    // scalastyle:off non.ascii.character.disallowed
+    s"沪A${taskId.processorId}${taskId.index}"
+    // scalastyle:on non.ascii.character.disallowed
+  }
+}
+
+object DataSource {
+  final val VEHICLE_NUM = "vehicle.number"
+  final val MOCK_CITY_SIZE = "mock.city.size"
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/QueryServer.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/QueryServer.scala b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/QueryServer.scala
index 5f91883..f9dbbde 100644
--- a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/QueryServer.scala
+++ b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/QueryServer.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,30 +18,29 @@
 package io.gearpump.streaming.examples.transport
 
 import java.util.concurrent.TimeUnit
+import scala.concurrent.Future
+import scala.util.{Failure, Success}
 
 import akka.actor.Actor._
-import akka.actor.{Actor, Props}
+import akka.actor.{Actor, ActorRefFactory, Props}
 import akka.io.IO
 import akka.pattern.ask
+import spray.can.Http
+import spray.http.StatusCodes
+import spray.json._
+import spray.routing.{HttpService, Route}
+import upickle.default.write
+
 import io.gearpump.Message
 import io.gearpump.cluster.UserConfig
 import io.gearpump.partitioner.PartitionerDescription
-import io.gearpump.streaming.appmaster.AppMaster
 import io.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef}
 import io.gearpump.streaming.examples.transport.QueryServer.{GetAllRecords, WebServer}
 import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
 import io.gearpump.streaming.{DAG, ProcessorDescription, ProcessorId, StreamApplication}
 import io.gearpump.util.Graph
-import spray.can.Http
-import spray.http.StatusCodes
-import spray.json._
-import spray.routing.HttpService
-import upickle.default.write
-
-import scala.concurrent.Future
-import scala.util.{Failure, Success}
 
-class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf){
+class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
   import system.dispatcher
   import taskContext.appMaster
 
@@ -50,7 +49,8 @@ class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskC
   private var overSpeedRecords = List.empty[OverSpeedReport]
 
   override def onStart(startTime: StartTime): Unit = {
-    val dag = DAG(conf.getValue[Graph[ProcessorDescription, PartitionerDescription]](StreamApplication.DAG).get)
+    val dag = DAG(conf.getValue[Graph[ProcessorDescription, PartitionerDescription]](
+      StreamApplication.DAG).get)
     inspector = dag.processors.find { kv =>
       val (_, processor) = kv
       processor.taskClass == classOf[VelocityInspector].getName
@@ -62,7 +62,7 @@ class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskC
   }
 
   override def receiveUnManagedMessage: Receive = {
-    case getTrace @ GetTrace(vehicleId: String) =>
+    case getTrace@GetTrace(vehicleId: String) =>
       val parallism = inspector._2.parallelism
       val processorId = inspector._1
       val analyzerTaskId = TaskId(processorId, (vehicleId.hashCode & Integer.MAX_VALUE) % parallism)
@@ -74,14 +74,14 @@ class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskC
         LOG.info(s"reporting $trace")
         requester ! trace
       }
-    case record@ OverSpeedReport(vehicleId, speed, timestamp, locationId) =>
+    case record@OverSpeedReport(vehicleId, speed, timestamp, locationId) =>
       LOG.info(s"vehicle $vehicleId is over speed, the speed is $speed km/h")
       overSpeedRecords :+= record
     case GetAllRecords =>
       sender ! QueryServer.OverSpeedRecords(overSpeedRecords.toArray.sortBy(_.timestamp))
       overSpeedRecords = List.empty[OverSpeedReport]
     case _ =>
-      //ignore
+    // Ignore
   }
 }
 
@@ -94,21 +94,22 @@ object QueryServer {
 
     import context.dispatcher
     implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
-    def actorRefFactory = context
+    def actorRefFactory: ActorRefFactory = context
     implicit val system = context.system
 
     IO(Http) ! Http.Bind(self, interface = "0.0.0.0", port = 8080)
 
     override def receive: Receive = runRoute(webServer ~ staticRoute)
 
-    def webServer = {
-      path("trace" / PathElement) { vehicleId =>
+    def webServer: Route = {
+      path("trace" / Segment) { vehicleId =>
         get {
           onComplete((context.parent ? GetTrace(vehicleId)).asInstanceOf[Future[VehicleTrace]]) {
             case Success(trace: VehicleTrace) =>
               val json = write(trace)
               complete(pretty(json))
-            case Failure(ex) => complete(StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}")
+            case Failure(ex) => complete(StatusCodes.InternalServerError,
+              s"An error occurred: ${ex.getMessage}")
           }
         }
       } ~
@@ -118,7 +119,8 @@ object QueryServer {
             case Success(records: OverSpeedRecords) =>
               val json = write(records)
               complete(pretty(json))
-            case Failure(ex) => complete(StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}")
+            case Failure(ex) => complete(StatusCodes.InternalServerError,
+              s"An error occurred: ${ex.getMessage}")
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Transport.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Transport.scala b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Transport.scala
index fb3b7c1..a795277 100644
--- a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Transport.scala
+++ b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Transport.scala
@@ -1,62 +1,69 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.transport
-
-import io.gearpump.streaming.{StreamApplication, Processor}
-import io.gearpump.cluster.UserConfig
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
-import io.gearpump.partitioner.HashPartitioner
-import io.gearpump.util.Graph._
-import io.gearpump.util.{AkkaApp, Graph}
-
-object Transport extends AkkaApp with ArgumentsParser {
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    "source"-> CLIOption[Int]("<how many task to generate data>", required = false, defaultValue = Some(10)),
-    "inspector"-> CLIOption[Int]("<how many over speed inspector>", required = false, defaultValue = Some(4)),
-    "vehicle"-> CLIOption[Int]("<how many vehicles's to generate>", required = false, defaultValue = Some(1000)),
-    "citysize"-> CLIOption[Int]("<the blocks number of the mock city>", required = false, defaultValue = Some(10)),
-    "threshold"-> CLIOption[Int]("<overdrive threshold, km/h>", required = false, defaultValue = Some(60)))
-  
-  def application(config: ParseResult): StreamApplication = {
-    val sourceNum = config.getInt("source")
-    val inspectorNum = config.getInt("inspector")
-    val vehicleNum = config.getInt("vehicle")
-    val citysize = config.getInt("citysize")
-    val threshold = config.getInt("threshold")
-    val source = Processor[DataSource](sourceNum)
-    val inspector = Processor[VelocityInspector](inspectorNum)
-    val queryServer = Processor[QueryServer](1)
-    val partitioner = new HashPartitioner
-
-    val userConfig = UserConfig.empty.withInt(DataSource.VEHICLE_NUM, vehicleNum).
-      withInt(DataSource.MOCK_CITY_SIZE, citysize).
-      withInt(VelocityInspector.OVER_DRIVE_THRESHOLD, threshold).
-      withInt(VelocityInspector.FAKE_PLATE_THRESHOLD, 200)
-    StreamApplication("transport", Graph(source ~ partitioner ~> inspector, Node(queryServer)), userConfig)
-  }
-
-  override def main(akkaConf: Config, args: Array[String]): Unit = {
-    val config = parse(args)
-    val context = ClientContext(akkaConf)
-    implicit val system = context.system
-    context.submit(application(config))
-    context.close()
-  }
-}
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.gearpump.streaming.examples.transport
+
+import io.gearpump.cluster.UserConfig
+import io.gearpump.cluster.client.ClientContext
+import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
+import io.gearpump.partitioner.HashPartitioner
+import io.gearpump.streaming.{Processor, StreamApplication}
+import io.gearpump.util.Graph._
+import io.gearpump.util.{AkkaApp, Graph}
+
+/** A city smart transportation streaming application */
+object Transport extends AkkaApp with ArgumentsParser {
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "source" -> CLIOption[Int]("<how many task to generate data>", required = false,
+      defaultValue = Some(10)),
+    "inspector" -> CLIOption[Int]("<how many over speed inspector>", required = false,
+      defaultValue = Some(4)),
+    "vehicle" -> CLIOption[Int]("<how many vehicles's to generate>", required = false,
+      defaultValue = Some(1000)),
+    "citysize" -> CLIOption[Int]("<the blocks number of the mock city>", required = false,
+      defaultValue = Some(10)),
+    "threshold" -> CLIOption[Int]("<overdrive threshold, km/h>", required = false,
+      defaultValue = Some(60)))
+
+  def application(config: ParseResult): StreamApplication = {
+    val sourceNum = config.getInt("source")
+    val inspectorNum = config.getInt("inspector")
+    val vehicleNum = config.getInt("vehicle")
+    val citysize = config.getInt("citysize")
+    val threshold = config.getInt("threshold")
+    val source = Processor[DataSource](sourceNum)
+    val inspector = Processor[VelocityInspector](inspectorNum)
+    val queryServer = Processor[QueryServer](1)
+    val partitioner = new HashPartitioner
+
+    val userConfig = UserConfig.empty.withInt(DataSource.VEHICLE_NUM, vehicleNum).
+      withInt(DataSource.MOCK_CITY_SIZE, citysize).
+      withInt(VelocityInspector.OVER_DRIVE_THRESHOLD, threshold).
+      withInt(VelocityInspector.FAKE_PLATE_THRESHOLD, 200)
+    StreamApplication("transport", Graph(source ~ partitioner ~> inspector,
+      Node(queryServer)), userConfig)
+  }
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+    val context = ClientContext(akkaConf)
+    implicit val system = context.system
+    context.submit(application(config))
+    context.close()
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/VelocityInspector.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/VelocityInspector.scala b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/VelocityInspector.scala
index 5265d45..b9be8d7 100644
--- a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/VelocityInspector.scala
+++ b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/VelocityInspector.scala
@@ -1,116 +1,123 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.transport
-
-import java.util.concurrent.TimeUnit
-
-import akka.actor.Actor._
-import akka.actor.ActorRef
-import akka.pattern.ask
-import io.gearpump.streaming.{StreamApplication, ProcessorDescription, DAG}
-import io.gearpump.streaming.appmaster.AppMaster
-import io.gearpump.streaming.examples.transport.generator.MockCity
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.partitioner.PartitionerDescription
-import AppMaster.{TaskActorRef, LookupTaskActorRef}
-import io.gearpump.util.Graph
-
-import scala.collection.immutable.Queue
-import scala.collection.mutable
-import scala.concurrent.Future
-
-class VelocityInspector(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
-  import taskContext.appMaster
-  import system.dispatcher
-  implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
-  private val passRecords = mutable.Map.empty[String, Queue[PassRecord]]
-  private val fakePlateThreshold = conf.getInt(VelocityInspector.FAKE_PLATE_THRESHOLD).get
-  private val overdriveThreshold = conf.getInt(VelocityInspector.OVER_DRIVE_THRESHOLD).get
-  private val citySize = conf.getInt(DataSource.MOCK_CITY_SIZE).get
-  private val mockCity = new MockCity(citySize)
-  private var queryServerActor: ActorRef = null
-
-  override def onStart(startTime: StartTime): Unit = {
-    val dag = DAG(conf.getValue[Graph[ProcessorDescription, PartitionerDescription]](StreamApplication.DAG).get)
-    val queryServer = dag.processors.find { kv =>
-      val (_, processor) = kv
-      processor.taskClass == classOf[QueryServer].getName
-    }.get
-    val queryServerTaskId = TaskId(queryServer._1, 0)
-    (appMaster ? LookupTaskActorRef(queryServerTaskId)).asInstanceOf[Future[TaskActorRef]].map {task =>
-      queryServerActor = task.task
-    }
-  }
-
-  import VelocityInspector._
-  override def onNext(msg: Message): Unit = {
-    msg.msg match {
-      case passRecord: PassRecord =>
-        val records = passRecords.getOrElse(passRecord.vehicleId, Queue.empty[PassRecord])
-        if(records.size > 0) {
-          val velocity = getVelocity(passRecord, records.last)
-          val formatted = "%.2f".format(velocity)
-          if(velocity > overdriveThreshold) {
-            if(velocity > fakePlateThreshold) {
-              LOG.info(s"vehicle ${passRecord.vehicleId} maybe a fake plate, the speed is $formatted km/h")
-            }
-            if(queryServerActor != null) {
-              queryServerActor ! OverSpeedReport(passRecord.vehicleId, formatted, passRecord.timeStamp, passRecord.locationId)
-            }
-          }
-        }
-        passRecords.update(passRecord.vehicleId, records.enqueueFinite(passRecord, RECORDS_NUM))
-    }
-  }
-
-  override def receiveUnManagedMessage: Receive = {
-    case GetTrace(vehicleId) =>
-      val records = passRecords.getOrElse(vehicleId, Queue.empty[PassRecord])
-      sender ! VehicleTrace(records.toArray.sortBy(_.timeStamp))
-  }
-  
-  private def getVelocity(passRecord: PassRecord, lastPassRecord: PassRecord): Float = {
-    val distanceInKm = getDistance(lastPassRecord.locationId, passRecord.locationId)
-    val timeInHour = (passRecord.timeStamp - lastPassRecord.timeStamp).toFloat / (1000 * 60 * 60)
-    distanceInKm / timeInHour
-  }
-  
-  private def getDistance(location1: String, location2: String): Long = {
-    mockCity.getDistance(location1, location2)
-  }
-}
-
-object VelocityInspector{
-  final val OVER_DRIVE_THRESHOLD = "overdrive.threshold"
-  final val FAKE_PLATE_THRESHOLD = "fakeplate.threshold"
-  final val RECORDS_NUM = 20
-
-  class FiniteQueue[T](q: Queue[T]) {
-    def enqueueFinite[B >: T](elem: B, maxSize: Int): Queue[B] = {
-      var result = q.enqueue(elem)
-      while (result.size > maxSize) {
-        result = result.dequeue._2
-      }
-      result
-    }
-  }
-
-  implicit def queue2FiniteQueue[T](q: Queue[T]): FiniteQueue[T] = new FiniteQueue[T](q)
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.gearpump.streaming.examples.transport
+
+import java.util.concurrent.TimeUnit
+import scala.collection.immutable.Queue
+import scala.collection.mutable
+import scala.concurrent.Future
+
+import akka.actor.Actor._
+import akka.actor.ActorRef
+import akka.pattern.ask
+
+import io.gearpump.Message
+import io.gearpump.cluster.UserConfig
+import io.gearpump.partitioner.PartitionerDescription
+import io.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef}
+import io.gearpump.streaming.examples.transport.generator.MockCity
+import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
+import io.gearpump.streaming.{DAG, ProcessorDescription, StreamApplication}
+import io.gearpump.util.Graph
+
+class VelocityInspector(taskContext: TaskContext, conf: UserConfig)
+  extends Task(taskContext, conf) {
+
+  import system.dispatcher
+  import taskContext.appMaster
+  implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
+  private val passRecords = mutable.Map.empty[String, Queue[PassRecord]]
+  private val fakePlateThreshold = conf.getInt(VelocityInspector.FAKE_PLATE_THRESHOLD).get
+  private val overdriveThreshold = conf.getInt(VelocityInspector.OVER_DRIVE_THRESHOLD).get
+  private val citySize = conf.getInt(DataSource.MOCK_CITY_SIZE).get
+  private val mockCity = new MockCity(citySize)
+  private var queryServerActor: ActorRef = null
+
+  override def onStart(startTime: StartTime): Unit = {
+    val dag = DAG(conf.getValue[Graph[ProcessorDescription, PartitionerDescription]](
+      StreamApplication.DAG).get)
+    val queryServer = dag.processors.find { kv =>
+      val (_, processor) = kv
+      processor.taskClass == classOf[QueryServer].getName
+    }.get
+    val queryServerTaskId = TaskId(queryServer._1, 0)
+    (appMaster ? LookupTaskActorRef(queryServerTaskId)).asInstanceOf[Future[TaskActorRef]]
+      .map { task =>
+        queryServerActor = task.task
+      }
+  }
+
+  import io.gearpump.streaming.examples.transport.VelocityInspector._
+  override def onNext(msg: Message): Unit = {
+    msg.msg match {
+      case passRecord: PassRecord =>
+        val records = passRecords.getOrElse(passRecord.vehicleId, Queue.empty[PassRecord])
+        if (records.size > 0) {
+          val velocity = getVelocity(passRecord, records.last)
+          val formatted = "%.2f".format(velocity)
+          if (velocity > overdriveThreshold) {
+            if (velocity > fakePlateThreshold) {
+              LOG.info(s"vehicle ${passRecord.vehicleId} maybe a fake plate, " +
+                s"the speed is $formatted km/h")
+            }
+            if (queryServerActor != null) {
+              queryServerActor ! OverSpeedReport(passRecord.vehicleId, formatted,
+                passRecord.timeStamp, passRecord.locationId)
+            }
+          }
+        }
+        passRecords.update(passRecord.vehicleId, records.enqueueFinite(passRecord, RECORDS_NUM))
+    }
+  }
+
+  override def receiveUnManagedMessage: Receive = {
+    case GetTrace(vehicleId) =>
+      val records = passRecords.getOrElse(vehicleId, Queue.empty[PassRecord])
+      sender ! VehicleTrace(records.toArray.sortBy(_.timeStamp))
+  }
+
+  private def getVelocity(passRecord: PassRecord, lastPassRecord: PassRecord): Float = {
+    val distanceInKm = getDistance(lastPassRecord.locationId, passRecord.locationId)
+    val timeInHour = (passRecord.timeStamp - lastPassRecord.timeStamp).toFloat / (1000 * 60 * 60)
+    distanceInKm / timeInHour
+  }
+
+  private def getDistance(location1: String, location2: String): Long = {
+    mockCity.getDistance(location1, location2)
+  }
+}
+
+object VelocityInspector {
+  final val OVER_DRIVE_THRESHOLD = "overdrive.threshold"
+  final val FAKE_PLATE_THRESHOLD = "fakeplate.threshold"
+  final val RECORDS_NUM = 20
+
+  class FiniteQueue[T](q: Queue[T]) {
+    def enqueueFinite[B >: T](elem: B, maxSize: Int): Queue[B] = {
+      var result = q.enqueue(elem)
+      while (result.size > maxSize) {
+        result = result.dequeue._2
+      }
+      result
+    }
+  }
+
+  import scala.language.implicitConversions
+
+  implicit def queue2FiniteQueue[T](q: Queue[T]): FiniteQueue[T] = new FiniteQueue[T](q)
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/MockCity.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/MockCity.scala b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/MockCity.scala
index d2786ef..ff78679 100644
--- a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/MockCity.scala
+++ b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/MockCity.scala
@@ -1,86 +1,88 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.transport.generator
-
-import scala.util.Random
-import MockCity._
-
-class MockCity(size: Int) {
-  private val random = new Random()
-  private val directions = Array(UP, DOWN, LEFT, RIGHT)
-
-  def nextLocation(currentLocationId: String): String = {
-    val coordinate = idToCoordinate(currentLocationId)
-    val direction = directions(random.nextInt(4))
-    val newCoordinate = coordinate.addOffset(direction)
-    if(inCity(newCoordinate)) {
-      coordinateToId(newCoordinate)
-    } else {
-      nextLocation(currentLocationId)
-    }
-  }
-  
-  def getDistance(locationId1: String, locationId2: String): Long = {
-    val coordinate1 = idToCoordinate(locationId1)
-    val coordinate2 = idToCoordinate(locationId2)
-    val blocks = Math.abs(coordinate1.row - coordinate2.row) + Math.abs(coordinate1.column - coordinate2.column)
-    blocks * LENGTH_PER_BLOCK
-  }
-  
-  def randomLocationId(): String = {
-    val row = random.nextInt(size)
-    val column = random.nextInt(size)
-    coordinateToId(Coordinate(row, column))
-  }
-  
-  private def coordinateToId(coordinate: Coordinate): String = {
-    s"Id_${coordinate.row}_${coordinate.column}"
-  }
-  
-  private def idToCoordinate(locationId: String): Coordinate = {
-    val attr = locationId.split("_")
-    val row = attr(1).toInt
-    val column =attr(2).toInt
-    Coordinate(row, column)
-  }
-
-  private def inCity(coordinate: Coordinate): Boolean = {
-    coordinate.row >= 0 &&
-      coordinate.row < size &&
-      coordinate.column >= 0 &&
-      coordinate.column < size
-  }
-}
-
-object MockCity{
-  //The length of the mock city, km
-  final val LENGTH_PER_BLOCK = 5
-  //The minimal speed, km/h
-  final val MINIMAL_SPEED = 10
-  
-  final val UP = Coordinate(0, 1)
-  final val DOWN = Coordinate(0, -1)
-  final val LEFT = Coordinate(-1, 0)
-  final val RIGHT = Coordinate(1, 0)
-  
-  case class Coordinate(row: Int, column: Int) {
-    def addOffset(coordinate: Coordinate): Coordinate = {
-      Coordinate(this.row + coordinate.row, this.column + coordinate.column)
-    }
-  }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.gearpump.streaming.examples.transport.generator
+
+import scala.util.Random
+
+import io.gearpump.streaming.examples.transport.generator.MockCity._
+
+class MockCity(size: Int) {
+  private val random = new Random()
+  private val directions = Array(UP, DOWN, LEFT, RIGHT)
+
+  def nextLocation(currentLocationId: String): String = {
+    val coordinate = idToCoordinate(currentLocationId)
+    val direction = directions(random.nextInt(4))
+    val newCoordinate = coordinate.addOffset(direction)
+    if (inCity(newCoordinate)) {
+      coordinateToId(newCoordinate)
+    } else {
+      nextLocation(currentLocationId)
+    }
+  }
+
+  def getDistance(locationId1: String, locationId2: String): Long = {
+    val coordinate1 = idToCoordinate(locationId1)
+    val coordinate2 = idToCoordinate(locationId2)
+    val blocks = Math.abs(coordinate1.row - coordinate2.row) +
+      Math.abs(coordinate1.column - coordinate2.column)
+    blocks * LENGTH_PER_BLOCK
+  }
+
+  def randomLocationId(): String = {
+    val row = random.nextInt(size)
+    val column = random.nextInt(size)
+    coordinateToId(Coordinate(row, column))
+  }
+
+  private def coordinateToId(coordinate: Coordinate): String = {
+    s"Id_${coordinate.row}_${coordinate.column}"
+  }
+
+  private def idToCoordinate(locationId: String): Coordinate = {
+    val attr = locationId.split("_")
+    val row = attr(1).toInt
+    val column = attr(2).toInt
+    Coordinate(row, column)
+  }
+
+  private def inCity(coordinate: Coordinate): Boolean = {
+    coordinate.row >= 0 &&
+      coordinate.row < size &&
+      coordinate.column >= 0 &&
+      coordinate.column < size
+  }
+}
+
+object MockCity {
+  // The length of the mock city, km
+  final val LENGTH_PER_BLOCK = 5
+  // The minimal speed, km/h
+  final val MINIMAL_SPEED = 10
+
+  final val UP = Coordinate(0, 1)
+  final val DOWN = Coordinate(0, -1)
+  final val LEFT = Coordinate(-1, 0)
+  final val RIGHT = Coordinate(1, 0)
+
+  case class Coordinate(row: Int, column: Int) {
+    def addOffset(coordinate: Coordinate): Coordinate = {
+      Coordinate(this.row + coordinate.row, this.column + coordinate.column)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala
index 7a930d9..ee06b25 100644
--- a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala
+++ b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala
@@ -1,68 +1,69 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.transport.generator
-
-import io.gearpump.streaming.examples.transport.PassRecord
-import io.gearpump.util.LogUtil
-
-import scala.util.Random
-
-class PassRecordGenerator(vehicleId: String, city: MockCity, overdriveThreshold: Int) {
-  private val LOG = LogUtil.getLogger(getClass)
-  LOG.info(s"Generate pass record for vehicle $vehicleId")
-  private var timeStamp = System.currentTimeMillis()
-
-  private var locationId = city.randomLocationId()
-  private val random = new Random()
-  private val fakePlate = random.nextInt(1000) < 1000 * PassRecordGenerator.FAKE_PLATE_RATE
-  private val (randomMin, randomRange) = {
-    val lowerBound = MockCity.LENGTH_PER_BLOCK * 1000 * 60 * 60 / overdriveThreshold.toFloat
-    val upperBound = MockCity.LENGTH_PER_BLOCK * 1000 * 60 * 60 / MockCity.MINIMAL_SPEED.toFloat
-    val overdrive = (upperBound - lowerBound) * PassRecordGenerator.OVERDRIVE_RATE
-    val randomMin = Math.max(lowerBound - overdrive, PassRecordGenerator.TWOMINUTES)
-    val randomRange = upperBound - randomMin
-    (randomMin.toInt, randomRange.toInt)
-  }
-  
-  def getNextPassRecord(): PassRecord = {
-    locationId = if(fakePlate) {
-      city.randomLocationId()
-    } else {
-      city.nextLocation(locationId)
-    }
-    timeStamp += (random.nextInt(randomRange) + randomMin)
-    PassRecord(vehicleId, locationId, timeStamp)
-  }
-}
-
-object PassRecordGenerator {
-  final val FAKE_PLATE_RATE = 0.01F
-  final val OVERDRIVE_RATE = 0.05F
-  final val TWOMINUTES = 2 * 60 * 1000
-  
-  def create(generatorNum: Int, prefix: String, city: MockCity, overdriveThreshold: Int): Array[PassRecordGenerator] = {
-    var result = Map.empty[String, PassRecordGenerator]
-    val digitsNum = (Math.log10(generatorNum) + 1).toInt
-    for(i <- 1 to generatorNum) {
-      val vehicleId = prefix + s"%0${digitsNum}d".format(i)
-      val generator = new PassRecordGenerator(vehicleId, city, overdriveThreshold)
-      result += vehicleId -> generator
-    }
-    result.values.toArray
-  }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.gearpump.streaming.examples.transport.generator
+
+import scala.util.Random
+
+import io.gearpump.streaming.examples.transport.PassRecord
+import io.gearpump.util.LogUtil
+
+class PassRecordGenerator(vehicleId: String, city: MockCity, overdriveThreshold: Int) {
+  private val LOG = LogUtil.getLogger(getClass)
+  LOG.info(s"Generate pass record for vehicle $vehicleId")
+  private var timeStamp = System.currentTimeMillis()
+
+  private var locationId = city.randomLocationId()
+  private val random = new Random()
+  private val fakePlate = random.nextInt(1000) < 1000 * PassRecordGenerator.FAKE_PLATE_RATE
+  private val (randomMin, randomRange) = {
+    val lowerBound = MockCity.LENGTH_PER_BLOCK * 1000 * 60 * 60 / overdriveThreshold.toFloat
+    val upperBound = MockCity.LENGTH_PER_BLOCK * 1000 * 60 * 60 / MockCity.MINIMAL_SPEED.toFloat
+    val overdrive = (upperBound - lowerBound) * PassRecordGenerator.OVERDRIVE_RATE
+    val randomMin = Math.max(lowerBound - overdrive, PassRecordGenerator.TWOMINUTES)
+    val randomRange = upperBound - randomMin
+    (randomMin.toInt, randomRange.toInt)
+  }
+
+  def getNextPassRecord(): PassRecord = {
+    locationId = if (fakePlate) {
+      city.randomLocationId()
+    } else {
+      city.nextLocation(locationId)
+    }
+    timeStamp += (random.nextInt(randomRange) + randomMin)
+    PassRecord(vehicleId, locationId, timeStamp)
+  }
+}
+
+object PassRecordGenerator {
+  final val FAKE_PLATE_RATE = 0.01F
+  final val OVERDRIVE_RATE = 0.05F
+  final val TWOMINUTES = 2 * 60 * 1000
+
+  def create(generatorNum: Int, prefix: String, city: MockCity, overdriveThreshold: Int)
+    : Array[PassRecordGenerator] = {
+    var result = Map.empty[String, PassRecordGenerator]
+    val digitsNum = (Math.log10(generatorNum) + 1).toInt
+    for (i <- 1 to generatorNum) {
+      val vehicleId = prefix + s"%0${digitsNum}d".format(i)
+      val generator = new PassRecordGenerator(vehicleId, city, overdriveThreshold)
+      result += vehicleId -> generator
+    }
+    result.values.toArray
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/DataSourceSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/DataSourceSpec.scala b/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/DataSourceSpec.scala
index adf44c0..75f9d60 100644
--- a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/DataSourceSpec.scala
+++ b/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/DataSourceSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,15 +17,14 @@
  */
 package io.gearpump.streaming.examples.transport
 
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.examples.transport.{VelocityInspector, DataSource}
-import io.gearpump.streaming.task.StartTime
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.examples.transport.VelocityInspector
 import org.mockito.Matchers._
 import org.mockito.Mockito._
-import org.scalatest.{Matchers, FlatSpec}
+import org.scalatest.{FlatSpec, Matchers}
+
+import io.gearpump.Message
+import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.MockUtil
+import io.gearpump.streaming.task.StartTime
 
 class DataSourceSpec extends FlatSpec with Matchers {
   it should "create the pass record" in {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/TransportSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/TransportSpec.scala b/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/TransportSpec.scala
index 94b89e1..b61fd43 100644
--- a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/TransportSpec.scala
+++ b/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/TransportSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,19 +17,19 @@
  */
 package io.gearpump.streaming.examples.transport
 
-import io.gearpump.streaming.examples.transport.Transport
-import io.gearpump.cluster.ClientToMaster.SubmitApplication
-import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
-import io.gearpump.cluster.{TestUtil, MasterHarness}
-import io.gearpump.util.Util
-import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, Matchers, PropSpec}
+import scala.concurrent.Future
+import scala.util.Success
+
 import org.scalatest.prop.PropertyChecks
+import org.scalatest.{BeforeAndAfterAll, Matchers, PropSpec}
 
-import scala.util.Success
+import io.gearpump.cluster.ClientToMaster.SubmitApplication
+import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
+import io.gearpump.cluster.{MasterHarness, TestUtil}
 
-import scala.concurrent.Future
+class TransportSpec
+  extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll with MasterHarness {
 
-class TransportSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll with MasterHarness {
   override def beforeAll {
     startActorSystem()
   }
@@ -38,9 +38,9 @@ class TransportSpec extends PropSpec with PropertyChecks with Matchers with Befo
     shutdownActorSystem()
   }
 
-  override def config = TestUtil.DEFAULT_CONFIG
+  protected override def config = TestUtil.DEFAULT_CONFIG
 
-  property("Transport should succeed to submit application with required arguments"){
+  property("Transport should succeed to submit application with required arguments") {
     val requiredArgs = Array.empty[String]
     val optionalArgs = Array(
       "-source", "1",
@@ -59,10 +59,11 @@ class TransportSpec extends PropSpec with PropertyChecks with Matchers with Befo
     forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) =>
       val args = requiredArgs ++ optionalArgs
 
-      Future {Transport.main(masterConfig, args)}
+      Future {
+        Transport.main(masterConfig, args)
+      }
       masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
       masterReceiver.reply(SubmitApplicationResult(Success(0)))
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/MockCitySpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/MockCitySpec.scala b/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/MockCitySpec.scala
index 2f1495d..e91d91c 100644
--- a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/MockCitySpec.scala
+++ b/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/MockCitySpec.scala
@@ -1,32 +1,31 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.transport.generator
-
-import io.gearpump.streaming.examples.transport.generator.MockCity
-import org.scalatest.{Matchers, PropSpec}
-import org.scalatest.prop.PropertyChecks
-
-class MockCitySpec extends PropSpec with PropertyChecks with Matchers{
-
-  property("MockCity should maintain the location properly") {
-    val city = new MockCity(10)
-    val start = city.randomLocationId()
-    val nextLocation = city.nextLocation(start)
-    assert(city.getDistance(start, nextLocation) == MockCity.LENGTH_PER_BLOCK)
-  }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.gearpump.streaming.examples.transport.generator
+
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+class MockCitySpec extends PropSpec with PropertyChecks with Matchers {
+
+  property("MockCity should maintain the location properly") {
+    val city = new MockCity(10)
+    val start = city.randomLocationId()
+    val nextLocation = city.nextLocation(start)
+    assert(city.getDistance(start, nextLocation) == MockCity.LENGTH_PER_BLOCK)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala b/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala
index d1ce32d..1c1901e 100644
--- a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala
+++ b/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,19 +17,18 @@
  */
 package io.gearpump.streaming.examples.transport.generator
 
-import io.gearpump.streaming.examples.transport.generator.{MockCity, PassRecordGenerator}
-import io.gearpump.streaming.examples.transport.generator.MockCity
-import org.scalatest.{Matchers, PropSpec}
 import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
 
-class PassRecordGeneratorSpec extends PropSpec with PropertyChecks with Matchers{
+class PassRecordGeneratorSpec extends PropSpec with PropertyChecks with Matchers {
 
-  property("PassRecordGenerator should generate pass record"){
+  property("PassRecordGenerator should generate pass record") {
     val id = "test"
     val city = new MockCity(10)
     val generator = new PassRecordGenerator(id, city, 60)
     val passrecord1 = generator.getNextPassRecord()
     val passrecord2 = generator.getNextPassRecord()
-    assert(city.getDistance(passrecord1.locationId, passrecord2.locationId) == MockCity.LENGTH_PER_BLOCK)
+    assert(city.getDistance(passrecord1.locationId, passrecord2.locationId) ==
+      MockCity.LENGTH_PER_BLOCK)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Split.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Split.java b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Split.java
index 64a0f8b..720e179 100644
--- a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Split.java
+++ b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Split.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Sum.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Sum.java b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Sum.java
index bb45651..28cf8cb 100644
--- a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Sum.java
+++ b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Sum.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -27,7 +27,7 @@ import org.slf4j.Logger;
 
 import java.util.HashMap;
 
-public class Sum  extends Task {
+public class Sum extends Task {
 
   private Logger LOG = super.LOG();
   private HashMap<String, Integer> wordCount = new HashMap<String, Integer>();
@@ -43,7 +43,7 @@ public class Sum  extends Task {
 
   @Override
   public void onNext(Message messagePayLoad) {
-    String word = (String)(messagePayLoad.msg());
+    String word = (String) (messagePayLoad.msg());
     Integer current = wordCount.get(word);
     if (current == null) {
       current = 0;

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/WordCount.java b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/WordCount.java
index 9e1e7d5..40054d3 100644
--- a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/WordCount.java
+++ b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/WordCount.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -29,6 +29,7 @@ import io.gearpump.streaming.javaapi.Graph;
 import io.gearpump.streaming.javaapi.Processor;
 import io.gearpump.streaming.javaapi.StreamApplication;
 
+/** Java version of WordCount with Processor Graph API */
 public class WordCount {
 
   public static void main(String[] args) throws InterruptedException {
@@ -53,11 +54,9 @@ public class WordCount {
     Partitioner partitioner = new HashPartitioner();
     graph.addEdge(split, partitioner, sum);
 
-
     UserConfig conf = UserConfig.empty();
     StreamApplication app = new StreamApplication("wordcountJava", conf, graph);
 
-
     EmbeddedCluster localCluster = null;
 
     Boolean debugMode = System.getProperty("DEBUG") != null;

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
index 6857017..3aefd7f 100644
--- a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
+++ b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -34,6 +34,7 @@ import scala.Tuple2;
 import java.util.Iterator;
 import java.util.List;
 
+/** Java version of WordCount with high level DSL API */
 public class WordCount {
 
   public static void main(String[] args) throws InterruptedException {
@@ -61,14 +62,14 @@ public class WordCount {
       }
     }, "map");
 
-    JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(new GroupByFunction<Tuple2<String,Integer>, String>() {
+    JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(new GroupByFunction<Tuple2<String, Integer>, String>() {
       @Override
       public String apply(Tuple2<String, Integer> tuple) {
         return tuple._1();
       }
     }, 1, "groupBy");
 
-    JavaStream<Tuple2<String, Integer>> wordcount =groupedOnes.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
+    JavaStream<Tuple2<String, Integer>> wordcount = groupedOnes.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
       @Override
       public Tuple2<String, Integer> apply(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) {
         return new Tuple2<String, Integer>(t1._1(), t1._2() + t2._2());
@@ -80,7 +81,4 @@ public class WordCount {
     app.run();
     context.close();
   }
-
-
-
 }