You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/08/15 15:04:33 UTC

[2/3] incubator-gearpump git commit: [GEARPUMP-188] use java.time.Instant for Task start time

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/transport/js/transport.js
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/transport/js/transport.js b/examples/streaming/transport/src/main/resources/transport/js/transport.js
deleted file mode 100644
index eef0fe9..0000000
--- a/examples/streaming/transport/src/main/resources/transport/js/transport.js
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-var myChart = echarts.init(document.getElementById("mychart"))
-
-echarts.util.mapData.params.params.football = {
-  getGeoJson: function (callback) {
-    $.ajax({
-      url: "../svg/beijing.svg",
-      dataType: 'xml',
-      success: function (xml) {
-        callback(xml)
-      }
-    });
-  }
-}
-
-function updateRecords(tableId) {
-  $.getJSON("records", function (json) {
-      var tableStr = "<table class=\"dataintable\" style=\"margin-left: 5px;\">";
-      tableStr += "<tr><th>Over Speed Vehicle ID</th><th>Speed</th><th>Location</th><th>Time</th></tr>";
-      var records = json.records;
-      for (var i = 0; i < Math.min(records.length, 20); i++) {
-        var record = records[i];
-        var vehicleId = record.vehicleId;
-        var location = record.locationId.split("_");
-        var speed = record.speed;
-        var row = location[1];
-        var column = location[2];
-        var time = new Date(Number(record.timestamp)).toLocaleTimeString().replace(/^\D*/, '');
-        tableStr += "<tr><td>" + vehicleId + "</td>";
-        tableStr += "<td>" + speed + "km/h </td>"
-        tableStr += "<td>(" + row + ", " + column + ")</td>";
-        tableStr += "<td>" + time + "</td></tr>";
-      }
-      if (records.length < 20) {
-        for (var i = records.length; i < 20; i++) {
-          tableStr += "<tr><td></td>";
-          tableStr += "<td> </td>"
-          tableStr += "<td> </td>";
-          tableStr += "<td> </td></tr>";
-        }
-      }
-      tableStr += "</table>"
-      document.getElementById(tableId).innerHTML = tableStr;
-    }
-  )
-}
-
-function initChart(chartid, vehicleId) {
-  // \u57fa\u4e8e\u51c6\u5907\u597d\u7684dom\uff0c\u521d\u59cb\u5316echarts\u56fe\u8868
-  $.getJSON("trace/" + vehicleId, function (json) {
-    // \u4e3aecharts\u5bf9\u8c61\u52a0\u8f7d\u6570\u636e
-    var records = json.records;
-    var timeLine = new Array(records.length);
-    var markPoints = new Array(records.length);
-    var options_ = new Array(records.length - 2);
-    for (var i = 0; i < records.length; i++) {
-      var record = records[i];
-      var vehicleId = record.vehicleId;
-      var location = record.locationId.split("_");
-      var row = location[1];
-      var column = location[2];
-      var time = new Date(Number(record.timeStamp)).toLocaleTimeString().replace(/^\D*/, '');
-      timeLine[i] = time;
-      var currentPonit = {name: "", value: i, geoCoord: [row * 90, column * 90]};
-      markPoints[i] = currentPonit;
-    }
-    options_[0] =
-    {
-      title: {
-        text: 'Vehicle trace'
-      },
-      tooltip: {
-        trigger: 'item'
-      },
-      toolbox: {
-        show: false,
-        feature: {
-          mark: {show: true},
-          dataView: {show: true, readOnly: false},
-          magicType: {show: true, type: ['line', 'bar']},
-          restore: {show: true},
-          saveAsImage: {show: true}
-        }
-      },
-      series: [
-        {
-          name: 'Vehicle trace',
-          type: 'map',
-          mapType: 'football',
-          mapLocation: {
-            y: 30,
-            height: 430
-          },
-          itemStyle: {
-            normal: {label: {show: false}},
-            emphasis: {label: {show: false}}
-          },
-          data: [
-            {name: 'City', hoverable: false, itemStyle: {normal: {label: {show: false}}}}
-          ],
-          markPoint: {
-            symbol: 'circle',
-            symbolSize: 8,
-            itemStyle: {
-              normal: {
-                borderWidth: 1,
-                color: 'blue',
-                lineStyle: {
-                  type: 'solid'
-                }
-              }
-            },
-            data: [markPoints[0]]
-          },
-          markLine: {
-            smooth: true,
-            effect: {
-              show: true,
-              scaleSize: 1.5,
-              period: 1.5,
-              color: '#fff'
-            },
-            itemStyle: {
-              normal: {
-                borderWidth: 2,
-                color: 'red',
-                lineStyle: {
-                  type: 'solid'
-                }
-              }
-            },
-            data: []
-          }
-        }
-      ]
-    }
-    for (var i = 1; i < markPoints.length; i++) {
-      options_[i] =
-      {
-        series: [
-          {
-            markPoint: {
-              data: [markPoints[i]]
-            },
-            markLine: {
-              data: []
-            }
-          }
-        ]
-      }
-    }
-    var option = {
-      timeline: {
-        type: 'number',
-        playInterval: 500,
-        autoPlay: true,
-        data: timeLine
-      },
-      options: options_
-    };
-    myChart.setOption(option);
-  });
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/transport/svg/beijing.svg
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/transport/svg/beijing.svg b/examples/streaming/transport/src/main/resources/transport/svg/beijing.svg
deleted file mode 100644
index 5342c24..0000000
--- a/examples/streaming/transport/src/main/resources/transport/svg/beijing.svg
+++ /dev/null
@@ -1,199 +0,0 @@
-<?xml version="1.0" encoding="utf-8"?>
-
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-<!-- Generator: Adobe Illustrator 14.0.0, SVG Export Plug-In . SVG Version: 6.00 Build 43363)  -->
-<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
-<svg version="1.1" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" x="0px" y="0px" width="1103px"
-	 height="1115px" viewBox="0 0 200 202" enable-background="new 0 0 1103 1115" xml:space="preserve">
-<g id="\u80cc\u666f">
-
-		<polygon fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points="
-		719,377 533,377 487,398 491,563 469,571 469,682 504,682 546,676 661,667 672,672 710,672 740,676 751,645 746,567 725,563
-		719,510 	"/>
-
-		<path fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" d="
-		M359,533V402l-7-76l36-18l333-10l76,65v315l-38,41h-49l-37,6H534l-81,31h-37c-33.5-0.5-56.5-54.5-57-70V533z"/>
-
-		<path fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" d="
-		M252,571v44l11,39l16,41v71l7.5,27.5L290,811l11,13l16,1l447-4l48-10l51-45l7-25l8-70V370c1.5-12.5-4.5-24.5-11-32l-103-96l-25-15
-		l-21-6l-392,17l-19,8l-17,24l-35,28l-4,10l4,38l-3,40v47l3,37l-3,40V571z"/>
-
-		<path fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" d="
-		M259,133l63-8l52-13l57-19h49l241,6l69,26h51c18.5,2.5,27.5,6.5,37,16l82,121l16,28l9,20l11,37l11,21l25,37l7,122l11,80v24v73l6,30
-		l-6,17L944,857l-93,45l-46,62l-17,10l-49,1c-17.5,1.5-37.5,20.5-43,35l-19,52l-18,12l-84,27c-5.5,1.5-21.5,6.5-27-13l-6-37
-		c-2.5-8.5-3.5-22.5-27-23l-151-18c-15.5-1.5-30.5,5.5-50,18l-23,12c-17.5,8.5-41.5-2.5-49-30l-15-60v-35c0,0-49.5-86.5-55-90
-		s-35-9-35-9l-38-65l-5-23l-33-60l-6-22l13-46v-89l2-43l-2-63l10-37l19-111l-3-27c0.5-8.5,5.5-30.5,24-36l29-6l25-15h46l17-10l16-22
-		L259,133z"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points="
-		1112,523 990,523 967,530 878,530 	"/>
-
-		<line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" x1="1030" y1="405" x2="1112" y2="406"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points="
-		1112,48 962,122 777.5,343.5 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points="
-		856,-7 856,10 848,31 797,81 788,93 763,186 706,298 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points="
-		559.5,303.5 565,270 547,207 422,-7 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points="
-		-10,1003 13,995 25,988 42,931 42,912 47,898 70,870 74,853 66,789 74,779 110,767 143,742 152,717 280,647 331,633 359.5,617.5
-		"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points="
-		456,1115 449,1049 449,1022 467,986 466.5,977.5 455.5,868.5 460,842 460.5,754.5 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points="
-		1103,1115 1066,1044 1042,1011 1015,982 984,927 952,888 901,849 778,703 	"/>
-
-		<line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" x1="1036.5" y1="768.5" x2="1112" y2="810"/>
-
-		<line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" x1="878" y1="671" x2="1112" y2="682"/>
-
-		<line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" x1="1019" y1="-7" x2="955" y2="129"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		-10,533 540,536 671,531 878,530 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		0,468 132,468 168,457 175,457 201,470 212,470 360,470 422,474 458,474 494,472 539,472 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		539,477 590,477 595,474 623,474 630,470 732,470 753,476 763,477 825,502 843,503 929,502 984.5,496.5 1037,500 1091,500
-		1112,503 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		494,565 533.5,564.5 556,562 616,562 626,559 670,559 680,562 725,562 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		1030,405 971,405 885,435 683,435 589.5,435.5 539,439 523,439 372,439 254,439 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		360,618 389,602 403,600 469,602 591.5,601.5 625.5,591.5 652,589 699,588 987,589 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		716,377 748,377 778,344 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		-10,596 6,603 47,603 63,588 91,572 394,572 472,571 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		725,544 689,544 674,559 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		539,406 474.5,410.5 458,415 424.5,417.5 400.5,414.5 378,413 305.5,379.5 283.5,385.5 254,384 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		878,530 864,541 725,549 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		213,688 231,694 278,694 397,682 467,682 	"/>
-
-		<line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" x1="469" y1="682" x2="460.5" y2="754.5"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		308,755 313,638 313,496 316,487 315.5,420.5 313,399 313,344 282,280 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		352,327 322,338 299,344 254,344 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		162,-7 206,13 240,48 287,120 294,175 312,193 312,206 324,219.5 330,280 346,298 352,327 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		332.5,-9 357,41 364,70 369,109 379,124 381,155 379,159 373,159 370,168 375,265 391,322 401,414 401,474 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		477,93 482,229 486,361 489,372 490,399 	"/>
-
-		<line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" x1="560" y1="304" x2="556" y2="377"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		537,377 542,536 542,694 539,703 535,716 535,727 530,825 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		613,596 615,671 624,924 666,921 666,930 670,935 678,991 755,1061 766,1115 	"/>
-
-		<line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" x1="600" y1="227" x2="604" y2="377"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		642,434 640,377 638,302 638,227 642,99 653,10 653,0 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		1037,769 951,709 951,677 870,622 678,623 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		653,11 670,58 668,85 668,110 664,129 664,146 668,176 667,199 667,227 672,369 664,369 664,377 672,609 678,622 680,725 	"/>
-
-		<line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" x1="878" y1="671" x2="745" y2="671"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		1112,470 973,472 965,474 770,474 763,478 	"/>
-
-		<line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" x1="845" y1="434" x2="842" y2="696"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		763,530 763,383 751,372 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		812.5,125 816,205 861,243 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		667,172 619,174 578,175 494,178 434,180 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		720,271 709,269 392,272 340,275 329,279 282,283 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		826,797 837,810 840.5,831.5 885.5,884.5 967.16,1014.78 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		888,1003 1025,917 1050,905 1064.5,888.5 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		1071,1051 1051,1065 1024,1081 987,1065 976,1051 961,1040 898,1017 861,959 851,935 837,923 813,888 763,838 757,825 751,769
-		749,737 747,682 737,677 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		1009,988 967.28,1015.13 933.5,1117.5 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		0,984 9,974 15,957 15,950 15,939 26,909 34,901 45,897 60,878 58,867 63,856 64,844 61,825 64,818 61,776 104,769 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		917,866 845,900 797,919 781,915 742,923 725,923 667,930 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		-3,1076 93,1076 148,1056 248,1055 270,1047 333,1009 458,909 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#14A97E" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points="
-		239,129 241,154 251,172 270,176 292,176 295,187 312,193 346,192 352,184 371,184 374,237 377,272 391,327 401,414 422,418
-		449,416 472,411 530,406 539,414 542,677 560,703 535,725 530,810 	"/>
-</g>
-<g id="\u5c42_1" display="none">
-
-		<polyline display="inline" fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#0096C0" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		797,623.5 797,362.5 778,345.5 792,328.5 725,273.5 706,270.5 386.5,273 340,277.5 333,279.5 299,282.5 	"/>
-</g>
-</svg>

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/transport/transport.html
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/transport/transport.html b/examples/streaming/transport/src/main/resources/transport/transport.html
deleted file mode 100644
index baee931..0000000
--- a/examples/streaming/transport/src/main/resources/transport/transport.html
+++ /dev/null
@@ -1,88 +0,0 @@
-<!DOCTYPE html>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~      http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<html>
-
-<head>
-  <meta charset="utf-8">
-  <link rel=stylesheet type=text/css href="css/custom.css">
-  <script src="http://echarts.baidu.com/build/source/echarts-all.js"></script>
-  <script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js"></script>
-</head>
-
-<body style="background-color:#F2F2F2">
-<div id="container" style="width:882px; height:450px;margin-left:auto;margin-right:auto;">
-  <div style="height:0px"></div>
-  <div id="header">
-    <div
-      style="font-weight:600;position:relative;left:50px;top:50px;font-family: calibri, Arial, Helvetica, sans-serif;font-size:29px;color:white">
-      Big Data Transport Monitoring Demo
-    </div>
-  </div>
-  <div id="body">
-    <div id="Menu">
-      <div style="position:relative;margin-left:30px; margin-right:20px;margin-top:20px;">
-        <!-- form to post to accompany to get accompanying cars -->
-
-        <table style="width:100%">
-          <tr>
-            <td class="sidebar-label">Vehicle Id:</td>
-          </tr>
-          <tr>
-            <td class="sidebar-label"></td>
-          </tr>
-          <tr>
-            <td style="vertical-align:top;">
-              <input id="vehicleId" class="sidebar" type="text" name="vehicleId"/>
-            </td>
-          </tr>
-        </table>
-        <div class="splitter"></div>
-        <div>
-          <button id="search" onclick="search_onclick()">Search</button>
-        </div>
-      </div>
-    </div>
-    <div id="content"
-         style="height:100%;width:585px;float:left;position:relative;left:20px;overflow:scroll;">
-      <div
-        style="height:50px;position:relative;top:15px;vertical-align:middle;font-weight:300;font-family: calibri, Arial, Helvetica, sans-serif;font-size:22px;color:black">
-        Vehicle Trace:
-      </div>
-      <div style="height:7px;background-color:#92BDF2;"></div>
-
-      <div id="mychart"></div>
-
-      <div id="mytable"></div>
-    </div>
-  </div>
-  <div id="footer">
-    Big Data Team @ Intel
-  </div>
-  <script src="js/transport.js"></script>
-  <script type="text/javascript">
-    function search_onclick() {
-      var vehicleId = document.getElementById('vehicleId').value
-      initChart("mychart", vehicleId)
-    }
-    setInterval(updateRecords, 1000, "mytable")
-  </script>
-</div>
-</body>
-</html>

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Data.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Data.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Data.scala
deleted file mode 100644
index 0aaf72c..0000000
--- a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Data.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport
-
-case class LocationInfo(id: String, row: Int, column: Int)
-
-// scalastyle:off equals.hash.code
-case class PassRecord(vehicleId: String, locationId: String, timeStamp: Long) {
-  override def hashCode: Int = vehicleId.hashCode
-}
-// scalastyle:on equals.hash.code
-
-case class GetTrace(vehicleId: String)
-
-case class VehicleTrace(records: Array[PassRecord])
-
-case class OverSpeedReport(vehicleId: String, speed: String, timestamp: Long, locationId: String)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/DataSource.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/DataSource.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/DataSource.scala
deleted file mode 100644
index 555e850..0000000
--- a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/DataSource.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport
-
-import scala.concurrent.duration._
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.examples.transport.generator.{MockCity, PassRecordGenerator}
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
-
-class DataSource(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
-  import taskContext.{output, parallelism, scheduleOnce, taskId}
-  private val overdriveThreshold = conf.getInt(VelocityInspector.OVER_DRIVE_THRESHOLD).get
-  private val vehicleNum = conf.getInt(DataSource.VEHICLE_NUM).get / parallelism
-  private val citySize = conf.getInt(DataSource.MOCK_CITY_SIZE).get
-  private val mockCity = new MockCity(citySize)
-  private val recordGenerators: Array[PassRecordGenerator] =
-    PassRecordGenerator.create(vehicleNum, getIdentifier(taskId), mockCity, overdriveThreshold)
-
-  override def onStart(startTime: StartTime): Unit = {
-    self ! Message("start", System.currentTimeMillis())
-  }
-
-  override def onNext(msg: Message): Unit = {
-    recordGenerators.foreach(generator =>
-      output(Message(generator.getNextPassRecord(), System.currentTimeMillis())))
-    scheduleOnce(1.second)(self ! Message("continue", System.currentTimeMillis()))
-  }
-
-  private def getIdentifier(taskId: TaskId): String = {
-    // scalastyle:off non.ascii.character.disallowed
-    s"\u6caaA${taskId.processorId}${taskId.index}"
-    // scalastyle:on non.ascii.character.disallowed
-  }
-}
-
-object DataSource {
-  final val VEHICLE_NUM = "vehicle.number"
-  final val MOCK_CITY_SIZE = "mock.city.size"
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/QueryServer.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/QueryServer.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/QueryServer.scala
deleted file mode 100644
index ff3b4b4..0000000
--- a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/QueryServer.scala
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport
-
-import java.util.concurrent.TimeUnit
-import scala.concurrent.Future
-import scala.util.{Failure, Success}
-
-import akka.actor.Actor._
-import akka.actor.{Actor, ActorRefFactory, Props}
-import akka.io.IO
-import akka.pattern.ask
-import spray.can.Http
-import spray.http.StatusCodes
-import spray.json._
-import spray.routing.{HttpService, Route}
-import upickle.default.write
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.partitioner.PartitionerDescription
-import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef}
-import org.apache.gearpump.streaming.examples.transport.QueryServer.{GetAllRecords, WebServer}
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
-import org.apache.gearpump.streaming.{DAG, ProcessorDescription, ProcessorId, StreamApplication}
-import org.apache.gearpump.util.Graph
-
-class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
-  import system.dispatcher
-  import taskContext.appMaster
-
-  var inspector: (ProcessorId, ProcessorDescription) = null
-  implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
-  private var overSpeedRecords = List.empty[OverSpeedReport]
-
-  override def onStart(startTime: StartTime): Unit = {
-    val dag = DAG(conf.getValue[Graph[ProcessorDescription, PartitionerDescription]](
-      StreamApplication.DAG).get)
-    inspector = dag.processors.find { kv =>
-      val (_, processor) = kv
-      processor.taskClass == classOf[VelocityInspector].getName
-    }.get
-    taskContext.actorOf(Props(new WebServer))
-  }
-
-  override def onNext(msg: Message): Unit = {
-  }
-
-  override def receiveUnManagedMessage: Receive = {
-    case getTrace@GetTrace(vehicleId: String) =>
-      val parallism = inspector._2.parallelism
-      val processorId = inspector._1
-      val analyzerTaskId = TaskId(processorId, (vehicleId.hashCode & Integer.MAX_VALUE) % parallism)
-      val requester = sender
-      (appMaster ? LookupTaskActorRef(analyzerTaskId))
-        .asInstanceOf[Future[TaskActorRef]].flatMap { task =>
-        (task.task ? getTrace).asInstanceOf[Future[VehicleTrace]]
-      }.map { trace =>
-        LOG.info(s"reporting $trace")
-        requester ! trace
-      }
-    case record@OverSpeedReport(vehicleId, speed, timestamp, locationId) =>
-      LOG.info(s"vehicle $vehicleId is over speed, the speed is $speed km/h")
-      overSpeedRecords :+= record
-    case GetAllRecords =>
-      sender ! QueryServer.OverSpeedRecords(overSpeedRecords.toArray.sortBy(_.timestamp))
-      overSpeedRecords = List.empty[OverSpeedReport]
-    case _ =>
-    // Ignore
-  }
-}
-
-object QueryServer {
-  object GetAllRecords
-
-  case class OverSpeedRecords(records: Array[OverSpeedReport])
-
-  class WebServer extends Actor with HttpService {
-
-    import context.dispatcher
-    implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
-    def actorRefFactory: ActorRefFactory = context
-    implicit val system = context.system
-
-    IO(Http) ! Http.Bind(self, interface = "0.0.0.0", port = 8080)
-
-    override def receive: Receive = runRoute(webServer ~ staticRoute)
-
-    def webServer: Route = {
-      path("trace" / Segment) { vehicleId =>
-        get {
-          onComplete((context.parent ? GetTrace(vehicleId)).asInstanceOf[Future[VehicleTrace]]) {
-            case Success(trace: VehicleTrace) =>
-              val json = write(trace)
-              complete(pretty(json))
-            case Failure(ex) => complete(StatusCodes.InternalServerError,
-              s"An error occurred: ${ex.getMessage}")
-          }
-        }
-      } ~
-      path("records") {
-        get {
-          onComplete((context.parent ? GetAllRecords).asInstanceOf[Future[OverSpeedRecords]]) {
-            case Success(records: OverSpeedRecords) =>
-              val json = write(records)
-              complete(pretty(json))
-            case Failure(ex) => complete(StatusCodes.InternalServerError,
-              s"An error occurred: ${ex.getMessage}")
-          }
-        }
-      }
-    }
-
-    val staticRoute = {
-      pathEndOrSingleSlash {
-        getFromResource("transport/transport.html")
-      } ~
-        pathPrefix("css") {
-          get {
-            getFromResourceDirectory("transport/css")
-          }
-        } ~
-        pathPrefix("svg") {
-          get {
-            getFromResourceDirectory("transport/svg")
-          }
-        } ~
-        pathPrefix("js") {
-          get {
-            getFromResourceDirectory("transport/js")
-          }
-        }
-    }
-
-    private def pretty(json: String): String = {
-      json.parseJson.prettyPrint
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Transport.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Transport.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Transport.scala
deleted file mode 100644
index 5beb2e1..0000000
--- a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Transport.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport
-
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
-import org.apache.gearpump.partitioner.HashPartitioner
-import org.apache.gearpump.streaming.{Processor, StreamApplication}
-import org.apache.gearpump.util.Graph._
-import org.apache.gearpump.util.{AkkaApp, Graph}
-
-/** A city smart transportation streaming application */
-object Transport extends AkkaApp with ArgumentsParser {
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    "source" -> CLIOption[Int]("<how many task to generate data>", required = false,
-      defaultValue = Some(10)),
-    "inspector" -> CLIOption[Int]("<how many over speed inspector>", required = false,
-      defaultValue = Some(4)),
-    "vehicle" -> CLIOption[Int]("<how many vehicles's to generate>", required = false,
-      defaultValue = Some(1000)),
-    "citysize" -> CLIOption[Int]("<the blocks number of the mock city>", required = false,
-      defaultValue = Some(10)),
-    "threshold" -> CLIOption[Int]("<overdrive threshold, km/h>", required = false,
-      defaultValue = Some(60)))
-
-  def application(config: ParseResult): StreamApplication = {
-    val sourceNum = config.getInt("source")
-    val inspectorNum = config.getInt("inspector")
-    val vehicleNum = config.getInt("vehicle")
-    val citysize = config.getInt("citysize")
-    val threshold = config.getInt("threshold")
-    val source = Processor[DataSource](sourceNum)
-    val inspector = Processor[VelocityInspector](inspectorNum)
-    val queryServer = Processor[QueryServer](1)
-    val partitioner = new HashPartitioner
-
-    val userConfig = UserConfig.empty.withInt(DataSource.VEHICLE_NUM, vehicleNum).
-      withInt(DataSource.MOCK_CITY_SIZE, citysize).
-      withInt(VelocityInspector.OVER_DRIVE_THRESHOLD, threshold).
-      withInt(VelocityInspector.FAKE_PLATE_THRESHOLD, 200)
-    StreamApplication("transport", Graph(source ~ partitioner ~> inspector,
-      Node(queryServer)), userConfig)
-  }
-
-  override def main(akkaConf: Config, args: Array[String]): Unit = {
-    val config = parse(args)
-    val context = ClientContext(akkaConf)
-    implicit val system = context.system
-    context.submit(application(config))
-    context.close()
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/VelocityInspector.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/VelocityInspector.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/VelocityInspector.scala
deleted file mode 100644
index 4d9bd04..0000000
--- a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/VelocityInspector.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport
-
-import java.util.concurrent.TimeUnit
-import scala.collection.immutable.Queue
-import scala.collection.mutable
-import scala.concurrent.Future
-
-import akka.actor.Actor._
-import akka.actor.ActorRef
-import akka.pattern.ask
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.partitioner.PartitionerDescription
-import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef}
-import org.apache.gearpump.streaming.examples.transport.generator.MockCity
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
-import org.apache.gearpump.streaming.{DAG, ProcessorDescription, StreamApplication}
-import org.apache.gearpump.util.Graph
-
-class VelocityInspector(taskContext: TaskContext, conf: UserConfig)
-  extends Task(taskContext, conf) {
-
-  import system.dispatcher
-  import taskContext.appMaster
-  implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
-  private val passRecords = mutable.Map.empty[String, Queue[PassRecord]]
-  private val fakePlateThreshold = conf.getInt(VelocityInspector.FAKE_PLATE_THRESHOLD).get
-  private val overdriveThreshold = conf.getInt(VelocityInspector.OVER_DRIVE_THRESHOLD).get
-  private val citySize = conf.getInt(DataSource.MOCK_CITY_SIZE).get
-  private val mockCity = new MockCity(citySize)
-  private var queryServerActor: ActorRef = null
-
-  override def onStart(startTime: StartTime): Unit = {
-    val dag = DAG(conf.getValue[Graph[ProcessorDescription, PartitionerDescription]](
-      StreamApplication.DAG).get)
-    val queryServer = dag.processors.find { kv =>
-      val (_, processor) = kv
-      processor.taskClass == classOf[QueryServer].getName
-    }.get
-    val queryServerTaskId = TaskId(queryServer._1, 0)
-    (appMaster ? LookupTaskActorRef(queryServerTaskId)).asInstanceOf[Future[TaskActorRef]]
-      .map { task =>
-        queryServerActor = task.task
-      }
-  }
-
-  import org.apache.gearpump.streaming.examples.transport.VelocityInspector._
-  override def onNext(msg: Message): Unit = {
-    msg.msg match {
-      case passRecord: PassRecord =>
-        val records = passRecords.getOrElse(passRecord.vehicleId, Queue.empty[PassRecord])
-        if (records.size > 0) {
-          val velocity = getVelocity(passRecord, records.last)
-          val formatted = "%.2f".format(velocity)
-          if (velocity > overdriveThreshold) {
-            if (velocity > fakePlateThreshold) {
-              LOG.info(s"vehicle ${passRecord.vehicleId} maybe a fake plate, " +
-                s"the speed is $formatted km/h")
-            }
-            if (queryServerActor != null) {
-              queryServerActor ! OverSpeedReport(passRecord.vehicleId, formatted,
-                passRecord.timeStamp, passRecord.locationId)
-            }
-          }
-        }
-        passRecords.update(passRecord.vehicleId, records.enqueueFinite(passRecord, RECORDS_NUM))
-    }
-  }
-
-  override def receiveUnManagedMessage: Receive = {
-    case GetTrace(vehicleId) =>
-      val records = passRecords.getOrElse(vehicleId, Queue.empty[PassRecord])
-      sender ! VehicleTrace(records.toArray.sortBy(_.timeStamp))
-  }
-
-  private def getVelocity(passRecord: PassRecord, lastPassRecord: PassRecord): Float = {
-    val distanceInKm = getDistance(lastPassRecord.locationId, passRecord.locationId)
-    val timeInHour = (passRecord.timeStamp - lastPassRecord.timeStamp).toFloat / (1000 * 60 * 60)
-    distanceInKm / timeInHour
-  }
-
-  private def getDistance(location1: String, location2: String): Long = {
-    mockCity.getDistance(location1, location2)
-  }
-}
-
-object VelocityInspector {
-  final val OVER_DRIVE_THRESHOLD = "overdrive.threshold"
-  final val FAKE_PLATE_THRESHOLD = "fakeplate.threshold"
-  final val RECORDS_NUM = 20
-
-  class FiniteQueue[T](q: Queue[T]) {
-    def enqueueFinite[B >: T](elem: B, maxSize: Int): Queue[B] = {
-      var result = q.enqueue(elem)
-      while (result.size > maxSize) {
-        result = result.dequeue._2
-      }
-      result
-    }
-  }
-
-  import scala.language.implicitConversions
-
-  implicit def queue2FiniteQueue[T](q: Queue[T]): FiniteQueue[T] = new FiniteQueue[T](q)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCity.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCity.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCity.scala
deleted file mode 100644
index 60e0bcf..0000000
--- a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCity.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport.generator
-
-import scala.util.Random
-
-import org.apache.gearpump.streaming.examples.transport.generator.MockCity._
-
-class MockCity(size: Int) {
-  private val random = new Random()
-  private val directions = Array(UP, DOWN, LEFT, RIGHT)
-
-  def nextLocation(currentLocationId: String): String = {
-    val coordinate = idToCoordinate(currentLocationId)
-    val direction = directions(random.nextInt(4))
-    val newCoordinate = coordinate.addOffset(direction)
-    if (inCity(newCoordinate)) {
-      coordinateToId(newCoordinate)
-    } else {
-      nextLocation(currentLocationId)
-    }
-  }
-
-  def getDistance(locationId1: String, locationId2: String): Long = {
-    val coordinate1 = idToCoordinate(locationId1)
-    val coordinate2 = idToCoordinate(locationId2)
-    val blocks = Math.abs(coordinate1.row - coordinate2.row) +
-      Math.abs(coordinate1.column - coordinate2.column)
-    blocks * LENGTH_PER_BLOCK
-  }
-
-  def randomLocationId(): String = {
-    val row = random.nextInt(size)
-    val column = random.nextInt(size)
-    coordinateToId(Coordinate(row, column))
-  }
-
-  private def coordinateToId(coordinate: Coordinate): String = {
-    s"Id_${coordinate.row}_${coordinate.column}"
-  }
-
-  private def idToCoordinate(locationId: String): Coordinate = {
-    val attr = locationId.split("_")
-    val row = attr(1).toInt
-    val column = attr(2).toInt
-    Coordinate(row, column)
-  }
-
-  private def inCity(coordinate: Coordinate): Boolean = {
-    coordinate.row >= 0 &&
-      coordinate.row < size &&
-      coordinate.column >= 0 &&
-      coordinate.column < size
-  }
-}
-
-object MockCity {
-  // The length of the mock city, km
-  final val LENGTH_PER_BLOCK = 5
-  // The minimal speed, km/h
-  final val MINIMAL_SPEED = 10
-
-  final val UP = Coordinate(0, 1)
-  final val DOWN = Coordinate(0, -1)
-  final val LEFT = Coordinate(-1, 0)
-  final val RIGHT = Coordinate(1, 0)
-
-  case class Coordinate(row: Int, column: Int) {
-    def addOffset(coordinate: Coordinate): Coordinate = {
-      Coordinate(this.row + coordinate.row, this.column + coordinate.column)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala
deleted file mode 100644
index e8c1c59..0000000
--- a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport.generator
-
-import scala.util.Random
-
-import org.apache.gearpump.streaming.examples.transport.PassRecord
-import org.apache.gearpump.util.LogUtil
-
-class PassRecordGenerator(vehicleId: String, city: MockCity, overdriveThreshold: Int) {
-  private val LOG = LogUtil.getLogger(getClass)
-  LOG.info(s"Generate pass record for vehicle $vehicleId")
-  private var timeStamp = System.currentTimeMillis()
-
-  private var locationId = city.randomLocationId()
-  private val random = new Random()
-  private val fakePlate = random.nextInt(1000) < 1000 * PassRecordGenerator.FAKE_PLATE_RATE
-  private val (randomMin, randomRange) = {
-    val lowerBound = MockCity.LENGTH_PER_BLOCK * 1000 * 60 * 60 / overdriveThreshold.toFloat
-    val upperBound = MockCity.LENGTH_PER_BLOCK * 1000 * 60 * 60 / MockCity.MINIMAL_SPEED.toFloat
-    val overdrive = (upperBound - lowerBound) * PassRecordGenerator.OVERDRIVE_RATE
-    val randomMin = Math.max(lowerBound - overdrive, PassRecordGenerator.TWOMINUTES)
-    val randomRange = upperBound - randomMin
-    (randomMin.toInt, randomRange.toInt)
-  }
-
-  def getNextPassRecord(): PassRecord = {
-    locationId = if (fakePlate) {
-      city.randomLocationId()
-    } else {
-      city.nextLocation(locationId)
-    }
-    timeStamp += (random.nextInt(randomRange) + randomMin)
-    PassRecord(vehicleId, locationId, timeStamp)
-  }
-}
-
-object PassRecordGenerator {
-  final val FAKE_PLATE_RATE = 0.01F
-  final val OVERDRIVE_RATE = 0.05F
-  final val TWOMINUTES = 2 * 60 * 1000
-
-  def create(generatorNum: Int, prefix: String, city: MockCity, overdriveThreshold: Int)
-    : Array[PassRecordGenerator] = {
-    var result = Map.empty[String, PassRecordGenerator]
-    val digitsNum = (Math.log10(generatorNum) + 1).toInt
-    for (i <- 1 to generatorNum) {
-      val vehicleId = prefix + s"%0${digitsNum}d".format(i)
-      val generator = new PassRecordGenerator(vehicleId, city, overdriveThreshold)
-      result += vehicleId -> generator
-    }
-    result.values.toArray
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/DataSourceSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/DataSourceSpec.scala b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/DataSourceSpec.scala
deleted file mode 100644
index 1f525ae..0000000
--- a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/DataSourceSpec.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport
-
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalatest.{FlatSpec, Matchers}
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
-
-class DataSourceSpec extends FlatSpec with Matchers {
-  it should "create the pass record" in {
-    val vehicleNum = 2
-    val context = MockUtil.mockTaskContext
-
-    val userConfig = UserConfig.empty.withInt(DataSource.VEHICLE_NUM, vehicleNum).
-      withInt(DataSource.MOCK_CITY_SIZE, 10).
-      withInt(VelocityInspector.OVER_DRIVE_THRESHOLD, 60).
-      withInt(VelocityInspector.FAKE_PLATE_THRESHOLD, 200)
-
-    val source = new DataSource(context, userConfig)
-    source.onStart(StartTime(0))
-    source.onNext(Message("start"))
-    verify(context, times(vehicleNum)).output(any[Message])
-    source.onStop()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/TransportSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/TransportSpec.scala b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/TransportSpec.scala
deleted file mode 100644
index 2f83de5..0000000
--- a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/TransportSpec.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport
-
-import scala.concurrent.Future
-import scala.util.Success
-
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{BeforeAndAfterAll, Matchers, PropSpec}
-
-import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication
-import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult
-import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
-
-class TransportSpec
-  extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll with MasterHarness {
-
-  override def beforeAll {
-    startActorSystem()
-  }
-
-  override def afterAll {
-    shutdownActorSystem()
-  }
-
-  protected override def config = TestUtil.DEFAULT_CONFIG
-
-  property("Transport should succeed to submit application with required arguments") {
-    val requiredArgs = Array.empty[String]
-    val optionalArgs = Array(
-      "-source", "1",
-      "-inspector", "1",
-      "-vehicle", "100",
-      "-citysize", "10",
-      "-threshold", "60")
-
-    val args = {
-      Table(
-        ("requiredArgs", "optionalArgs"),
-        (requiredArgs, optionalArgs)
-      )
-    }
-    val masterReceiver = createMockMaster()
-    forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) =>
-      val args = requiredArgs ++ optionalArgs
-
-      Future {
-        Transport.main(masterConfig, args)
-      }
-      masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
-      masterReceiver.reply(SubmitApplicationResult(Success(0)))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCitySpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCitySpec.scala b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCitySpec.scala
deleted file mode 100644
index ba4eb2d..0000000
--- a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCitySpec.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport.generator
-
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-class MockCitySpec extends PropSpec with PropertyChecks with Matchers {
-
-  property("MockCity should maintain the location properly") {
-    val city = new MockCity(10)
-    val start = city.randomLocationId()
-    val nextLocation = city.nextLocation(start)
-    assert(city.getDistance(start, nextLocation) == MockCity.LENGTH_PER_BLOCK)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala
deleted file mode 100644
index f0eebbf..0000000
--- a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport.generator
-
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-class PassRecordGeneratorSpec extends PropSpec with PropertyChecks with Matchers {
-
-  property("PassRecordGenerator should generate pass record") {
-    val id = "test"
-    val city = new MockCity(10)
-    val generator = new PassRecordGenerator(id, city, 60)
-    val passrecord1 = generator.getNextPassRecord()
-    val passrecord2 = generator.getNextPassRecord()
-    assert(city.getDistance(passrecord1.locationId, passrecord2.locationId) ==
-      MockCity.LENGTH_PER_BLOCK)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java
index 76069c1..0a8fb4f 100644
--- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java
+++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java
@@ -21,9 +21,10 @@ package org.apache.gearpump.streaming.examples.wordcountjava;
 import org.apache.gearpump.Message;
 import org.apache.gearpump.cluster.UserConfig;
 import org.apache.gearpump.streaming.javaapi.Task;
-import org.apache.gearpump.streaming.task.StartTime;
 import org.apache.gearpump.streaming.task.TaskContext;
 
+import java.time.Instant;
+
 public class Split extends Task {
 
   public static String TEXT = "This is a good start for java! bingo! bingo! ";
@@ -37,7 +38,7 @@ public class Split extends Task {
   }
 
   @Override
-  public void onStart(StartTime startTime) {
+  public void onStart(Instant startTime) {
     self().tell(new Message("start", now()), self());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java
index 89c3b14..3daa6e0 100644
--- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java
+++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java
@@ -21,10 +21,10 @@ package org.apache.gearpump.streaming.examples.wordcountjava;
 import org.apache.gearpump.Message;
 import org.apache.gearpump.cluster.UserConfig;
 import org.apache.gearpump.streaming.javaapi.Task;
-import org.apache.gearpump.streaming.task.StartTime;
 import org.apache.gearpump.streaming.task.TaskContext;
 import org.slf4j.Logger;
 
+import java.time.Instant;
 import java.util.HashMap;
 
 public class Sum extends Task {
@@ -37,7 +37,7 @@ public class Sum extends Task {
   }
 
   @Override
-  public void onStart(StartTime startTime) {
+  public void onStart(Instant startTime) {
     //skip
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
index ae63f10..af3c04c 100644
--- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
+++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
@@ -18,16 +18,17 @@
 
 package org.apache.gearpump.streaming.examples.wordcount
 
+import java.time.Instant
 import java.util.concurrent.TimeUnit
 
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
 class Split(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
   import taskContext.output
 
-  override def onStart(startTime: StartTime): Unit = {
+  override def onStart(startTime: Instant): Unit = {
     self ! Message("start")
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala
index c3fa82a..dbefc93 100644
--- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala
+++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala
@@ -18,6 +18,7 @@
 
 package org.apache.gearpump.streaming.examples.wordcount
 
+import java.time.Instant
 import java.util.concurrent.TimeUnit
 import scala.collection.mutable
 import scala.concurrent.duration.FiniteDuration
@@ -26,7 +27,7 @@ import akka.actor.Cancellable
 
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
 class Sum(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
   private[wordcount] val map: mutable.HashMap[String, Long] = new mutable.HashMap[String, Long]()
@@ -37,7 +38,7 @@ class Sum(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext,
 
   private var scheduler: Cancellable = null
 
-  override def onStart(startTime: StartTime): Unit = {
+  override def onStart(startTime: Instant): Unit = {
     scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS),
       new FiniteDuration(30, TimeUnit.SECONDS))(reportWordCount)
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala
index cef9337..8b50890 100644
--- a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala
+++ b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala
@@ -17,6 +17,8 @@
  */
 package org.apache.gearpump.streaming.examples.wordcount
 
+import java.time.Instant
+
 import scala.concurrent.Await
 import scala.concurrent.duration.Duration
 
@@ -29,7 +31,6 @@ import org.scalatest.{Matchers, WordSpec}
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.{TestUtil, UserConfig}
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
 
 class SplitSpec extends WordSpec with Matchers {
 
@@ -47,10 +48,10 @@ class SplitSpec extends WordSpec with Matchers {
 
       val conf = UserConfig.empty
       val split = new Split(taskContext, conf)
-      split.onStart(StartTime(0))
+      split.onStart(Instant.EPOCH)
       mockTaskActor.expectMsgType[Message]
 
-      val expectedWordCount = Split.TEXT_TO_SPLIT.split( """[\s\n]+""").filter(_.nonEmpty).length
+      val expectedWordCount = Split.TEXT_TO_SPLIT.split( """[\s\n]+""").count(_.nonEmpty)
 
       split.onNext(Message("next"))
       verify(taskContext, times(expectedWordCount)).output(anyObject())

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SumSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SumSpec.scala b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SumSpec.scala
index e42d696..17e1765 100644
--- a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SumSpec.scala
+++ b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SumSpec.scala
@@ -17,6 +17,8 @@
  */
 package org.apache.gearpump.streaming.examples.wordcount
 
+import java.time.Instant
+
 import org.scalacheck.Gen
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
@@ -24,7 +26,6 @@ import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
 
 class SumSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter {
   val stringGenerator = Gen.alphaStr
@@ -39,7 +40,7 @@ class SumSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndA
 
     val sum = new Sum(taskContext, conf)
 
-    sum.onStart(StartTime(0))
+    sum.onStart(Instant.EPOCH)
 
     forAll(stringGenerator) { txt =>
       wordcount += 1

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala
index 1d3048e..e3b45fb 100644
--- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala
@@ -18,6 +18,7 @@
 
 package org.apache.gearpump.experiments.storm.processor
 
+import java.time.Instant
 import java.util.concurrent.TimeUnit
 import scala.concurrent.duration.Duration
 
@@ -46,7 +47,7 @@ private[storm] class StormProcessor(gearpumpBolt: GearpumpBolt,
 
   private val freqOpt = gearpumpBolt.getTickFrequency
 
-  override def onStart(startTime: StartTime): Unit = {
+  override def onStart(startTime: Instant): Unit = {
     gearpumpBolt.start(startTime)
     freqOpt.foreach(scheduleTick)
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala
index 5d4a6a2..b92f037 100644
--- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala
@@ -18,6 +18,7 @@
 
 package org.apache.gearpump.experiments.storm.producer
 
+import java.time.Instant
 import java.util.concurrent.TimeUnit
 
 import akka.actor.Actor.Receive
@@ -48,7 +49,7 @@ private[storm] class StormProducer(gearpumpSpout: GearpumpSpout,
 
   private val timeoutMillis = gearpumpSpout.getMessageTimeout
 
-  override def onStart(startTime: StartTime): Unit = {
+  override def onStart(startTime: Instant): Unit = {
     gearpumpSpout.start(startTime)
     if (gearpumpSpout.ackEnabled) {
       getCheckpointClock

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
index d0f2949..a8e061c 100644
--- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
@@ -19,6 +19,7 @@
 package org.apache.gearpump.experiments.storm.topology
 
 import java.io.{File, FileOutputStream, IOException}
+import java.time.Instant
 import java.util
 import java.util.jar.JarFile
 import java.util.{HashMap => JHashMap, List => JList, Map => JMap}
@@ -40,7 +41,7 @@ import org.apache.gearpump.experiments.storm.util.StormConstants._
 import org.apache.gearpump.experiments.storm.util.StormUtil._
 import org.apache.gearpump.experiments.storm.util.{StormOutputCollector, StormUtil}
 import org.apache.gearpump.streaming.DAG
-import org.apache.gearpump.streaming.task.{GetDAG, TaskId, TaskContext, StartTime}
+import org.apache.gearpump.streaming.task.{GetDAG, TaskId, TaskContext}
 import org.apache.gearpump.util.{Constants, LogUtil}
 import org.apache.gearpump.{Message, TimeStamp}
 import org.slf4j.Logger
@@ -57,7 +58,7 @@ trait GearpumpStormComponent {
    * invoked at Task.onStart
    * @param startTime task start time
    */
-  def start(startTime: StartTime): Unit
+  def start(startTime: Instant): Unit
 
   /**
    * invoked at Task.onNext
@@ -123,7 +124,7 @@ object GearpumpStormComponent {
 
     private var collector: StormSpoutOutputCollector = null
 
-    override def start(startTime: StartTime): Unit = {
+    override def start(startTime: Instant): Unit = {
       val dag = getDAG(taskContext.appMaster)
       val topologyContext = getTopologyContext(dag, taskContext.taskId)
       collector = getOutputCollector(taskContext, topologyContext)
@@ -206,7 +207,7 @@ object GearpumpStormComponent {
     private var generalTopologyContext: GeneralTopologyContext = null
     private var tickTuple: Tuple = null
 
-    override def start(startTime: StartTime): Unit = {
+    override def start(startTime: Instant): Unit = {
       val dag = getDAG(taskContext.appMaster)
       topologyContext = getTopologyContext(dag, taskContext.taskId)
       generalTopologyContext = getGeneralTopologyContext(dag)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormProcessorSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormProcessorSpec.scala
index 2111df6..9bbac58 100644
--- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormProcessorSpec.scala
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormProcessorSpec.scala
@@ -18,11 +18,12 @@
 
 package org.apache.gearpump.experiments.storm.processor
 
+import java.time.Instant
+
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpBolt
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
 import org.mockito.Mockito._
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.{Matchers, WordSpec}
@@ -31,7 +32,7 @@ class StormProcessorSpec extends WordSpec with Matchers with MockitoSugar {
 
   "StormProcessor" should {
     "start GearpumpSpout onStart" in {
-      val startTime = mock[StartTime]
+      val startTime = Instant.EPOCH
       val gearpumpBolt = mock[GearpumpBolt]
       when(gearpumpBolt.getTickFrequency).thenReturn(None)
       val taskContext = MockUtil.mockTaskContext

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormProducerSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormProducerSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormProducerSpec.scala
index 39a008f..ee89a4a 100644
--- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormProducerSpec.scala
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormProducerSpec.scala
@@ -18,12 +18,13 @@
 
 package org.apache.gearpump.experiments.storm.producer
 
+import java.time.Instant
+
 import akka.testkit.TestProbe
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpSpout
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
 import org.mockito.Mockito._
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.{Matchers, WordSpec}
@@ -32,7 +33,7 @@ class StormProducerSpec extends WordSpec with Matchers with MockitoSugar {
 
   "StormProducer" should {
     "start GearpumpSpout onStart" in {
-      val startTime = mock[StartTime]
+      val startTime = Instant.EPOCH
       val gearpumpSpout = mock[GearpumpSpout]
       when(gearpumpSpout.getMessageTimeout).thenReturn(None)
       val taskContext = MockUtil.mockTaskContext

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
index bdea50c..0891070 100644
--- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.gearpump.experiments.storm.topology
 
+import java.time.Instant
 import java.util.{Map => JMap}
 
 import akka.actor.ActorRef
@@ -26,7 +27,7 @@ import backtype.storm.tuple.Tuple
 import org.apache.gearpump.experiments.storm.producer.StormSpoutOutputCollector
 import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, GearpumpSpout}
 import org.apache.gearpump.experiments.storm.util.StormOutputCollector
-import org.apache.gearpump.streaming.task.{StartTime, TaskContext, TaskId}
+import org.apache.gearpump.streaming.task.{TaskContext, TaskId}
 import org.apache.gearpump.streaming.{DAG, MockUtil}
 import org.apache.gearpump.{Message, TimeStamp}
 import org.mockito.Matchers.{anyObject, eq => mockitoEq}
@@ -59,8 +60,7 @@ class GearpumpStormComponentSpec
       getOutputCollector, ackEnabled = false, taskContext)
 
     // Start
-    val startTime = mock[StartTime]
-    gearpumpSpout.start(startTime)
+    gearpumpSpout.start(Instant.EPOCH)
 
     verify(spout).open(mockitoEq(config), mockitoEq(topologyContext),
       anyObject[SpoutOutputCollector])
@@ -100,8 +100,7 @@ class GearpumpStormComponentSpec
         getGeneralTopologyContext, getOutputCollector, getTickTuple, taskContext)
 
       // Start
-      val startTime = mock[StartTime]
-      gearpumpBolt.start(startTime)
+      gearpumpBolt.start(Instant.EPOCH)
 
       verify(bolt).prepare(mockitoEq(config), mockitoEq(topologyContext),
         anyObject[OutputCollector])

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
index ef383ad..b92b2e1 100644
--- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
@@ -79,7 +79,6 @@ class GearpumpStormTopologySpec extends WordSpec with Matchers with MockitoSugar
     "get target processors from source id" in {
       val stormTopology = TopologyUtil.getTestTopology
       implicit val system = MockUtil.system
-      val sysConfig = new JHashMap[AnyRef, AnyRef]
       val gearpumpStormTopology =
         GearpumpStormTopology("app", stormTopology, null)
       val targets0 = gearpumpStormTopology.getTargets("1")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
index da08b04..314eae8 100644
--- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
@@ -18,6 +18,7 @@
 
 package org.apache.gearpump.streaming.kafka.lib.source
 
+import java.time.Instant
 import java.util.Properties
 
 import com.twitter.bijection.Injection
@@ -87,11 +88,11 @@ abstract class AbstractKafkaSource(
     this.checkpointStoreFactory = Some(checkpointStoreFactory)
   }
 
-  override def open(context: TaskContext, startTime: TimeStamp): Unit = {
+  override def open(context: TaskContext, startTime: Instant): Unit = {
     import context.{parallelism, taskId}
 
     LOG.info("KafkaSource opened at start time {}", startTime)
-    this.startTime = startTime
+    this.startTime = startTime.toEpochMilli
     val topicList = topic.split(",", -1).toList
     val grouper = config.getConfiguredInstance(KafkaConfig.PARTITION_GROUPER_CLASS_CONFIG,
       classOf[PartitionGrouper])

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala
index e40276f..6ccb231 100644
--- a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala
+++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala
@@ -18,6 +18,7 @@
 
 package org.apache.gearpump.streaming.kafka
 
+import java.time.Instant
 import java.util.Properties
 
 import com.twitter.bijection.Injection
@@ -42,7 +43,7 @@ import org.scalatest.{Matchers, PropSpec}
 
 class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
 
-  val startTimeGen = Gen.choose[Long](0L, 100L)
+  val startTimeGen = Gen.choose[Long](0L, 100L).map(Instant.ofEpochMilli)
   val offsetGen = Gen.choose[Long](0L, 100L)
   val topicAndPartitionGen = for {
     topic <- Gen.alphaStr suchThat (_.nonEmpty)
@@ -51,7 +52,7 @@ class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with Mo
   val tpsGen = Gen.listOf[TopicAndPartition](topicAndPartitionGen) suchThat (_.nonEmpty)
 
   property("KafkaSource open should not recover without checkpoint") {
-    forAll(startTimeGen, tpsGen) { (startTime: Long, tps: List[TopicAndPartition]) =>
+    forAll(startTimeGen, tpsGen) { (startTime: Instant, tps: List[TopicAndPartition]) =>
       val taskContext = MockUtil.mockTaskContext
       val fetchThread = mock[FetchThread]
       val kafkaClient = mock[KafkaClient]
@@ -84,7 +85,7 @@ class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with Mo
 
   property("KafkaSource open should recover with checkpoint") {
     forAll(startTimeGen, offsetGen, tpsGen) {
-      (startTime: Long, offset: Long, tps: List[TopicAndPartition]) =>
+      (startTime: Instant, offset: Long, tps: List[TopicAndPartition]) =>
         val taskContext = MockUtil.mockTaskContext
         val checkpointStoreFactory = mock[CheckpointStoreFactory]
         val checkpointStores = tps.map(_ -> mock[CheckpointStore]).toMap
@@ -115,7 +116,8 @@ class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with Mo
         checkpointStores.foreach { case (tp, store) =>
           when(checkpointStoreFactory.getCheckpointStore(
             KafkaConfig.getCheckpointStoreNameSuffix(tp))).thenReturn(store)
-          when(store.recover(startTime)).thenReturn(Some(Injection[Long, Array[Byte]](offset)))
+          when(store.recover(startTime.toEpochMilli))
+            .thenReturn(Some(Injection[Long, Array[Byte]](offset)))
         }
 
         source.setCheckpointStore(checkpointStoreFactory)