You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/08/15 15:04:33 UTC
[2/3] incubator-gearpump git commit: [GEARPUMP-188] use
java.time.Instant for Task start time
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/transport/js/transport.js
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/transport/js/transport.js b/examples/streaming/transport/src/main/resources/transport/js/transport.js
deleted file mode 100644
index eef0fe9..0000000
--- a/examples/streaming/transport/src/main/resources/transport/js/transport.js
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-var myChart = echarts.init(document.getElementById("mychart"))
-
-echarts.util.mapData.params.params.football = {
- getGeoJson: function (callback) {
- $.ajax({
- url: "../svg/beijing.svg",
- dataType: 'xml',
- success: function (xml) {
- callback(xml)
- }
- });
- }
-}
-
-function updateRecords(tableId) {
- $.getJSON("records", function (json) {
- var tableStr = "<table class=\"dataintable\" style=\"margin-left: 5px;\">";
- tableStr += "<tr><th>Over Speed Vehicle ID</th><th>Speed</th><th>Location</th><th>Time</th></tr>";
- var records = json.records;
- for (var i = 0; i < Math.min(records.length, 20); i++) {
- var record = records[i];
- var vehicleId = record.vehicleId;
- var location = record.locationId.split("_");
- var speed = record.speed;
- var row = location[1];
- var column = location[2];
- var time = new Date(Number(record.timestamp)).toLocaleTimeString().replace(/^\D*/, '');
- tableStr += "<tr><td>" + vehicleId + "</td>";
- tableStr += "<td>" + speed + "km/h </td>"
- tableStr += "<td>(" + row + ", " + column + ")</td>";
- tableStr += "<td>" + time + "</td></tr>";
- }
- if (records.length < 20) {
- for (var i = records.length; i < 20; i++) {
- tableStr += "<tr><td></td>";
- tableStr += "<td> </td>"
- tableStr += "<td> </td>";
- tableStr += "<td> </td></tr>";
- }
- }
- tableStr += "</table>"
- document.getElementById(tableId).innerHTML = tableStr;
- }
- )
-}
-
-function initChart(chartid, vehicleId) {
- // \u57fa\u4e8e\u51c6\u5907\u597d\u7684dom\uff0c\u521d\u59cb\u5316echarts\u56fe\u8868
- $.getJSON("trace/" + vehicleId, function (json) {
- // \u4e3aecharts\u5bf9\u8c61\u52a0\u8f7d\u6570\u636e
- var records = json.records;
- var timeLine = new Array(records.length);
- var markPoints = new Array(records.length);
- var options_ = new Array(records.length - 2);
- for (var i = 0; i < records.length; i++) {
- var record = records[i];
- var vehicleId = record.vehicleId;
- var location = record.locationId.split("_");
- var row = location[1];
- var column = location[2];
- var time = new Date(Number(record.timeStamp)).toLocaleTimeString().replace(/^\D*/, '');
- timeLine[i] = time;
- var currentPonit = {name: "", value: i, geoCoord: [row * 90, column * 90]};
- markPoints[i] = currentPonit;
- }
- options_[0] =
- {
- title: {
- text: 'Vehicle trace'
- },
- tooltip: {
- trigger: 'item'
- },
- toolbox: {
- show: false,
- feature: {
- mark: {show: true},
- dataView: {show: true, readOnly: false},
- magicType: {show: true, type: ['line', 'bar']},
- restore: {show: true},
- saveAsImage: {show: true}
- }
- },
- series: [
- {
- name: 'Vehicle trace',
- type: 'map',
- mapType: 'football',
- mapLocation: {
- y: 30,
- height: 430
- },
- itemStyle: {
- normal: {label: {show: false}},
- emphasis: {label: {show: false}}
- },
- data: [
- {name: 'City', hoverable: false, itemStyle: {normal: {label: {show: false}}}}
- ],
- markPoint: {
- symbol: 'circle',
- symbolSize: 8,
- itemStyle: {
- normal: {
- borderWidth: 1,
- color: 'blue',
- lineStyle: {
- type: 'solid'
- }
- }
- },
- data: [markPoints[0]]
- },
- markLine: {
- smooth: true,
- effect: {
- show: true,
- scaleSize: 1.5,
- period: 1.5,
- color: '#fff'
- },
- itemStyle: {
- normal: {
- borderWidth: 2,
- color: 'red',
- lineStyle: {
- type: 'solid'
- }
- }
- },
- data: []
- }
- }
- ]
- }
- for (var i = 1; i < markPoints.length; i++) {
- options_[i] =
- {
- series: [
- {
- markPoint: {
- data: [markPoints[i]]
- },
- markLine: {
- data: []
- }
- }
- ]
- }
- }
- var option = {
- timeline: {
- type: 'number',
- playInterval: 500,
- autoPlay: true,
- data: timeLine
- },
- options: options_
- };
- myChart.setOption(option);
- });
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/transport/svg/beijing.svg
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/transport/svg/beijing.svg b/examples/streaming/transport/src/main/resources/transport/svg/beijing.svg
deleted file mode 100644
index 5342c24..0000000
--- a/examples/streaming/transport/src/main/resources/transport/svg/beijing.svg
+++ /dev/null
@@ -1,199 +0,0 @@
-<?xml version="1.0" encoding="utf-8"?>
-
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-<!-- Generator: Adobe Illustrator 14.0.0, SVG Export Plug-In . SVG Version: 6.00 Build 43363) -->
-<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
-<svg version="1.1" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" x="0px" y="0px" width="1103px"
- height="1115px" viewBox="0 0 200 202" enable-background="new 0 0 1103 1115" xml:space="preserve">
-<g id="\u80cc\u666f">
-
- <polygon fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points="
- 719,377 533,377 487,398 491,563 469,571 469,682 504,682 546,676 661,667 672,672 710,672 740,676 751,645 746,567 725,563
- 719,510 "/>
-
- <path fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" d="
- M359,533V402l-7-76l36-18l333-10l76,65v315l-38,41h-49l-37,6H534l-81,31h-37c-33.5-0.5-56.5-54.5-57-70V533z"/>
-
- <path fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" d="
- M252,571v44l11,39l16,41v71l7.5,27.5L290,811l11,13l16,1l447-4l48-10l51-45l7-25l8-70V370c1.5-12.5-4.5-24.5-11-32l-103-96l-25-15
- l-21-6l-392,17l-19,8l-17,24l-35,28l-4,10l4,38l-3,40v47l3,37l-3,40V571z"/>
-
- <path fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" d="
- M259,133l63-8l52-13l57-19h49l241,6l69,26h51c18.5,2.5,27.5,6.5,37,16l82,121l16,28l9,20l11,37l11,21l25,37l7,122l11,80v24v73l6,30
- l-6,17L944,857l-93,45l-46,62l-17,10l-49,1c-17.5,1.5-37.5,20.5-43,35l-19,52l-18,12l-84,27c-5.5,1.5-21.5,6.5-27-13l-6-37
- c-2.5-8.5-3.5-22.5-27-23l-151-18c-15.5-1.5-30.5,5.5-50,18l-23,12c-17.5,8.5-41.5-2.5-49-30l-15-60v-35c0,0-49.5-86.5-55-90
- s-35-9-35-9l-38-65l-5-23l-33-60l-6-22l13-46v-89l2-43l-2-63l10-37l19-111l-3-27c0.5-8.5,5.5-30.5,24-36l29-6l25-15h46l17-10l16-22
- L259,133z"/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points="
- 1112,523 990,523 967,530 878,530 "/>
-
- <line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" x1="1030" y1="405" x2="1112" y2="406"/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points="
- 1112,48 962,122 777.5,343.5 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points="
- 856,-7 856,10 848,31 797,81 788,93 763,186 706,298 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points="
- 559.5,303.5 565,270 547,207 422,-7 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points="
- -10,1003 13,995 25,988 42,931 42,912 47,898 70,870 74,853 66,789 74,779 110,767 143,742 152,717 280,647 331,633 359.5,617.5
- "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points="
- 456,1115 449,1049 449,1022 467,986 466.5,977.5 455.5,868.5 460,842 460.5,754.5 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points="
- 1103,1115 1066,1044 1042,1011 1015,982 984,927 952,888 901,849 778,703 "/>
-
- <line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" x1="1036.5" y1="768.5" x2="1112" y2="810"/>
-
- <line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" x1="878" y1="671" x2="1112" y2="682"/>
-
- <line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" x1="1019" y1="-7" x2="955" y2="129"/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- -10,533 540,536 671,531 878,530 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 0,468 132,468 168,457 175,457 201,470 212,470 360,470 422,474 458,474 494,472 539,472 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 539,477 590,477 595,474 623,474 630,470 732,470 753,476 763,477 825,502 843,503 929,502 984.5,496.5 1037,500 1091,500
- 1112,503 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 494,565 533.5,564.5 556,562 616,562 626,559 670,559 680,562 725,562 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 1030,405 971,405 885,435 683,435 589.5,435.5 539,439 523,439 372,439 254,439 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 360,618 389,602 403,600 469,602 591.5,601.5 625.5,591.5 652,589 699,588 987,589 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 716,377 748,377 778,344 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- -10,596 6,603 47,603 63,588 91,572 394,572 472,571 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 725,544 689,544 674,559 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 539,406 474.5,410.5 458,415 424.5,417.5 400.5,414.5 378,413 305.5,379.5 283.5,385.5 254,384 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 878,530 864,541 725,549 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 213,688 231,694 278,694 397,682 467,682 "/>
-
- <line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" x1="469" y1="682" x2="460.5" y2="754.5"/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 308,755 313,638 313,496 316,487 315.5,420.5 313,399 313,344 282,280 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 352,327 322,338 299,344 254,344 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 162,-7 206,13 240,48 287,120 294,175 312,193 312,206 324,219.5 330,280 346,298 352,327 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 332.5,-9 357,41 364,70 369,109 379,124 381,155 379,159 373,159 370,168 375,265 391,322 401,414 401,474 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 477,93 482,229 486,361 489,372 490,399 "/>
-
- <line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" x1="560" y1="304" x2="556" y2="377"/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 537,377 542,536 542,694 539,703 535,716 535,727 530,825 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 613,596 615,671 624,924 666,921 666,930 670,935 678,991 755,1061 766,1115 "/>
-
- <line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" x1="600" y1="227" x2="604" y2="377"/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 642,434 640,377 638,302 638,227 642,99 653,10 653,0 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 1037,769 951,709 951,677 870,622 678,623 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 653,11 670,58 668,85 668,110 664,129 664,146 668,176 667,199 667,227 672,369 664,369 664,377 672,609 678,622 680,725 "/>
-
- <line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" x1="878" y1="671" x2="745" y2="671"/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 1112,470 973,472 965,474 770,474 763,478 "/>
-
- <line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" x1="845" y1="434" x2="842" y2="696"/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 763,530 763,383 751,372 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 812.5,125 816,205 861,243 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 667,172 619,174 578,175 494,178 434,180 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 720,271 709,269 392,272 340,275 329,279 282,283 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 826,797 837,810 840.5,831.5 885.5,884.5 967.16,1014.78 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 888,1003 1025,917 1050,905 1064.5,888.5 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 1071,1051 1051,1065 1024,1081 987,1065 976,1051 961,1040 898,1017 861,959 851,935 837,923 813,888 763,838 757,825 751,769
- 749,737 747,682 737,677 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 1009,988 967.28,1015.13 933.5,1117.5 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 0,984 9,974 15,957 15,950 15,939 26,909 34,901 45,897 60,878 58,867 63,856 64,844 61,825 64,818 61,776 104,769 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 917,866 845,900 797,919 781,915 742,923 725,923 667,930 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- -3,1076 93,1076 148,1056 248,1055 270,1047 333,1009 458,909 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#14A97E" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points="
- 239,129 241,154 251,172 270,176 292,176 295,187 312,193 346,192 352,184 371,184 374,237 377,272 391,327 401,414 422,418
- 449,416 472,411 530,406 539,414 542,677 560,703 535,725 530,810 "/>
-</g>
-<g id="\u5c42_1" display="none">
-
- <polyline display="inline" fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#0096C0" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 797,623.5 797,362.5 778,345.5 792,328.5 725,273.5 706,270.5 386.5,273 340,277.5 333,279.5 299,282.5 "/>
-</g>
-</svg>
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/transport/transport.html
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/transport/transport.html b/examples/streaming/transport/src/main/resources/transport/transport.html
deleted file mode 100644
index baee931..0000000
--- a/examples/streaming/transport/src/main/resources/transport/transport.html
+++ /dev/null
@@ -1,88 +0,0 @@
-<!DOCTYPE html>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<html>
-
-<head>
- <meta charset="utf-8">
- <link rel=stylesheet type=text/css href="css/custom.css">
- <script src="http://echarts.baidu.com/build/source/echarts-all.js"></script>
- <script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js"></script>
-</head>
-
-<body style="background-color:#F2F2F2">
-<div id="container" style="width:882px; height:450px;margin-left:auto;margin-right:auto;">
- <div style="height:0px"></div>
- <div id="header">
- <div
- style="font-weight:600;position:relative;left:50px;top:50px;font-family: calibri, Arial, Helvetica, sans-serif;font-size:29px;color:white">
- Big Data Transport Monitoring Demo
- </div>
- </div>
- <div id="body">
- <div id="Menu">
- <div style="position:relative;margin-left:30px; margin-right:20px;margin-top:20px;">
- <!-- form to post to accompany to get accompanying cars -->
-
- <table style="width:100%">
- <tr>
- <td class="sidebar-label">Vehicle Id:</td>
- </tr>
- <tr>
- <td class="sidebar-label"></td>
- </tr>
- <tr>
- <td style="vertical-align:top;">
- <input id="vehicleId" class="sidebar" type="text" name="vehicleId"/>
- </td>
- </tr>
- </table>
- <div class="splitter"></div>
- <div>
- <button id="search" onclick="search_onclick()">Search</button>
- </div>
- </div>
- </div>
- <div id="content"
- style="height:100%;width:585px;float:left;position:relative;left:20px;overflow:scroll;">
- <div
- style="height:50px;position:relative;top:15px;vertical-align:middle;font-weight:300;font-family: calibri, Arial, Helvetica, sans-serif;font-size:22px;color:black">
- Vehicle Trace:
- </div>
- <div style="height:7px;background-color:#92BDF2;"></div>
-
- <div id="mychart"></div>
-
- <div id="mytable"></div>
- </div>
- </div>
- <div id="footer">
- Big Data Team @ Intel
- </div>
- <script src="js/transport.js"></script>
- <script type="text/javascript">
- function search_onclick() {
- var vehicleId = document.getElementById('vehicleId').value
- initChart("mychart", vehicleId)
- }
- setInterval(updateRecords, 1000, "mytable")
- </script>
-</div>
-</body>
-</html>
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Data.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Data.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Data.scala
deleted file mode 100644
index 0aaf72c..0000000
--- a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Data.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport
-
-case class LocationInfo(id: String, row: Int, column: Int)
-
-// scalastyle:off equals.hash.code
-case class PassRecord(vehicleId: String, locationId: String, timeStamp: Long) {
- override def hashCode: Int = vehicleId.hashCode
-}
-// scalastyle:on equals.hash.code
-
-case class GetTrace(vehicleId: String)
-
-case class VehicleTrace(records: Array[PassRecord])
-
-case class OverSpeedReport(vehicleId: String, speed: String, timestamp: Long, locationId: String)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/DataSource.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/DataSource.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/DataSource.scala
deleted file mode 100644
index 555e850..0000000
--- a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/DataSource.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport
-
-import scala.concurrent.duration._
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.examples.transport.generator.{MockCity, PassRecordGenerator}
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
-
-class DataSource(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
- import taskContext.{output, parallelism, scheduleOnce, taskId}
- private val overdriveThreshold = conf.getInt(VelocityInspector.OVER_DRIVE_THRESHOLD).get
- private val vehicleNum = conf.getInt(DataSource.VEHICLE_NUM).get / parallelism
- private val citySize = conf.getInt(DataSource.MOCK_CITY_SIZE).get
- private val mockCity = new MockCity(citySize)
- private val recordGenerators: Array[PassRecordGenerator] =
- PassRecordGenerator.create(vehicleNum, getIdentifier(taskId), mockCity, overdriveThreshold)
-
- override def onStart(startTime: StartTime): Unit = {
- self ! Message("start", System.currentTimeMillis())
- }
-
- override def onNext(msg: Message): Unit = {
- recordGenerators.foreach(generator =>
- output(Message(generator.getNextPassRecord(), System.currentTimeMillis())))
- scheduleOnce(1.second)(self ! Message("continue", System.currentTimeMillis()))
- }
-
- private def getIdentifier(taskId: TaskId): String = {
- // scalastyle:off non.ascii.character.disallowed
- s"\u6caaA${taskId.processorId}${taskId.index}"
- // scalastyle:on non.ascii.character.disallowed
- }
-}
-
-object DataSource {
- final val VEHICLE_NUM = "vehicle.number"
- final val MOCK_CITY_SIZE = "mock.city.size"
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/QueryServer.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/QueryServer.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/QueryServer.scala
deleted file mode 100644
index ff3b4b4..0000000
--- a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/QueryServer.scala
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport
-
-import java.util.concurrent.TimeUnit
-import scala.concurrent.Future
-import scala.util.{Failure, Success}
-
-import akka.actor.Actor._
-import akka.actor.{Actor, ActorRefFactory, Props}
-import akka.io.IO
-import akka.pattern.ask
-import spray.can.Http
-import spray.http.StatusCodes
-import spray.json._
-import spray.routing.{HttpService, Route}
-import upickle.default.write
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.partitioner.PartitionerDescription
-import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef}
-import org.apache.gearpump.streaming.examples.transport.QueryServer.{GetAllRecords, WebServer}
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
-import org.apache.gearpump.streaming.{DAG, ProcessorDescription, ProcessorId, StreamApplication}
-import org.apache.gearpump.util.Graph
-
-class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
- import system.dispatcher
- import taskContext.appMaster
-
- var inspector: (ProcessorId, ProcessorDescription) = null
- implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
- private var overSpeedRecords = List.empty[OverSpeedReport]
-
- override def onStart(startTime: StartTime): Unit = {
- val dag = DAG(conf.getValue[Graph[ProcessorDescription, PartitionerDescription]](
- StreamApplication.DAG).get)
- inspector = dag.processors.find { kv =>
- val (_, processor) = kv
- processor.taskClass == classOf[VelocityInspector].getName
- }.get
- taskContext.actorOf(Props(new WebServer))
- }
-
- override def onNext(msg: Message): Unit = {
- }
-
- override def receiveUnManagedMessage: Receive = {
- case getTrace@GetTrace(vehicleId: String) =>
- val parallism = inspector._2.parallelism
- val processorId = inspector._1
- val analyzerTaskId = TaskId(processorId, (vehicleId.hashCode & Integer.MAX_VALUE) % parallism)
- val requester = sender
- (appMaster ? LookupTaskActorRef(analyzerTaskId))
- .asInstanceOf[Future[TaskActorRef]].flatMap { task =>
- (task.task ? getTrace).asInstanceOf[Future[VehicleTrace]]
- }.map { trace =>
- LOG.info(s"reporting $trace")
- requester ! trace
- }
- case record@OverSpeedReport(vehicleId, speed, timestamp, locationId) =>
- LOG.info(s"vehicle $vehicleId is over speed, the speed is $speed km/h")
- overSpeedRecords :+= record
- case GetAllRecords =>
- sender ! QueryServer.OverSpeedRecords(overSpeedRecords.toArray.sortBy(_.timestamp))
- overSpeedRecords = List.empty[OverSpeedReport]
- case _ =>
- // Ignore
- }
-}
-
-object QueryServer {
- object GetAllRecords
-
- case class OverSpeedRecords(records: Array[OverSpeedReport])
-
- class WebServer extends Actor with HttpService {
-
- import context.dispatcher
- implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
- def actorRefFactory: ActorRefFactory = context
- implicit val system = context.system
-
- IO(Http) ! Http.Bind(self, interface = "0.0.0.0", port = 8080)
-
- override def receive: Receive = runRoute(webServer ~ staticRoute)
-
- def webServer: Route = {
- path("trace" / Segment) { vehicleId =>
- get {
- onComplete((context.parent ? GetTrace(vehicleId)).asInstanceOf[Future[VehicleTrace]]) {
- case Success(trace: VehicleTrace) =>
- val json = write(trace)
- complete(pretty(json))
- case Failure(ex) => complete(StatusCodes.InternalServerError,
- s"An error occurred: ${ex.getMessage}")
- }
- }
- } ~
- path("records") {
- get {
- onComplete((context.parent ? GetAllRecords).asInstanceOf[Future[OverSpeedRecords]]) {
- case Success(records: OverSpeedRecords) =>
- val json = write(records)
- complete(pretty(json))
- case Failure(ex) => complete(StatusCodes.InternalServerError,
- s"An error occurred: ${ex.getMessage}")
- }
- }
- }
- }
-
- val staticRoute = {
- pathEndOrSingleSlash {
- getFromResource("transport/transport.html")
- } ~
- pathPrefix("css") {
- get {
- getFromResourceDirectory("transport/css")
- }
- } ~
- pathPrefix("svg") {
- get {
- getFromResourceDirectory("transport/svg")
- }
- } ~
- pathPrefix("js") {
- get {
- getFromResourceDirectory("transport/js")
- }
- }
- }
-
- private def pretty(json: String): String = {
- json.parseJson.prettyPrint
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Transport.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Transport.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Transport.scala
deleted file mode 100644
index 5beb2e1..0000000
--- a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Transport.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport
-
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
-import org.apache.gearpump.partitioner.HashPartitioner
-import org.apache.gearpump.streaming.{Processor, StreamApplication}
-import org.apache.gearpump.util.Graph._
-import org.apache.gearpump.util.{AkkaApp, Graph}
-
-/** A city smart transportation streaming application */
-object Transport extends AkkaApp with ArgumentsParser {
- override val options: Array[(String, CLIOption[Any])] = Array(
- "source" -> CLIOption[Int]("<how many task to generate data>", required = false,
- defaultValue = Some(10)),
- "inspector" -> CLIOption[Int]("<how many over speed inspector>", required = false,
- defaultValue = Some(4)),
- "vehicle" -> CLIOption[Int]("<how many vehicles's to generate>", required = false,
- defaultValue = Some(1000)),
- "citysize" -> CLIOption[Int]("<the blocks number of the mock city>", required = false,
- defaultValue = Some(10)),
- "threshold" -> CLIOption[Int]("<overdrive threshold, km/h>", required = false,
- defaultValue = Some(60)))
-
- def application(config: ParseResult): StreamApplication = {
- val sourceNum = config.getInt("source")
- val inspectorNum = config.getInt("inspector")
- val vehicleNum = config.getInt("vehicle")
- val citysize = config.getInt("citysize")
- val threshold = config.getInt("threshold")
- val source = Processor[DataSource](sourceNum)
- val inspector = Processor[VelocityInspector](inspectorNum)
- val queryServer = Processor[QueryServer](1)
- val partitioner = new HashPartitioner
-
- val userConfig = UserConfig.empty.withInt(DataSource.VEHICLE_NUM, vehicleNum).
- withInt(DataSource.MOCK_CITY_SIZE, citysize).
- withInt(VelocityInspector.OVER_DRIVE_THRESHOLD, threshold).
- withInt(VelocityInspector.FAKE_PLATE_THRESHOLD, 200)
- StreamApplication("transport", Graph(source ~ partitioner ~> inspector,
- Node(queryServer)), userConfig)
- }
-
- override def main(akkaConf: Config, args: Array[String]): Unit = {
- val config = parse(args)
- val context = ClientContext(akkaConf)
- implicit val system = context.system
- context.submit(application(config))
- context.close()
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/VelocityInspector.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/VelocityInspector.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/VelocityInspector.scala
deleted file mode 100644
index 4d9bd04..0000000
--- a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/VelocityInspector.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport
-
-import java.util.concurrent.TimeUnit
-import scala.collection.immutable.Queue
-import scala.collection.mutable
-import scala.concurrent.Future
-
-import akka.actor.Actor._
-import akka.actor.ActorRef
-import akka.pattern.ask
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.partitioner.PartitionerDescription
-import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef}
-import org.apache.gearpump.streaming.examples.transport.generator.MockCity
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
-import org.apache.gearpump.streaming.{DAG, ProcessorDescription, StreamApplication}
-import org.apache.gearpump.util.Graph
-
-class VelocityInspector(taskContext: TaskContext, conf: UserConfig)
- extends Task(taskContext, conf) {
-
- import system.dispatcher
- import taskContext.appMaster
- implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
- private val passRecords = mutable.Map.empty[String, Queue[PassRecord]]
- private val fakePlateThreshold = conf.getInt(VelocityInspector.FAKE_PLATE_THRESHOLD).get
- private val overdriveThreshold = conf.getInt(VelocityInspector.OVER_DRIVE_THRESHOLD).get
- private val citySize = conf.getInt(DataSource.MOCK_CITY_SIZE).get
- private val mockCity = new MockCity(citySize)
- private var queryServerActor: ActorRef = null
-
- override def onStart(startTime: StartTime): Unit = {
- val dag = DAG(conf.getValue[Graph[ProcessorDescription, PartitionerDescription]](
- StreamApplication.DAG).get)
- val queryServer = dag.processors.find { kv =>
- val (_, processor) = kv
- processor.taskClass == classOf[QueryServer].getName
- }.get
- val queryServerTaskId = TaskId(queryServer._1, 0)
- (appMaster ? LookupTaskActorRef(queryServerTaskId)).asInstanceOf[Future[TaskActorRef]]
- .map { task =>
- queryServerActor = task.task
- }
- }
-
- import org.apache.gearpump.streaming.examples.transport.VelocityInspector._
- override def onNext(msg: Message): Unit = {
- msg.msg match {
- case passRecord: PassRecord =>
- val records = passRecords.getOrElse(passRecord.vehicleId, Queue.empty[PassRecord])
- if (records.size > 0) {
- val velocity = getVelocity(passRecord, records.last)
- val formatted = "%.2f".format(velocity)
- if (velocity > overdriveThreshold) {
- if (velocity > fakePlateThreshold) {
- LOG.info(s"vehicle ${passRecord.vehicleId} maybe a fake plate, " +
- s"the speed is $formatted km/h")
- }
- if (queryServerActor != null) {
- queryServerActor ! OverSpeedReport(passRecord.vehicleId, formatted,
- passRecord.timeStamp, passRecord.locationId)
- }
- }
- }
- passRecords.update(passRecord.vehicleId, records.enqueueFinite(passRecord, RECORDS_NUM))
- }
- }
-
- override def receiveUnManagedMessage: Receive = {
- case GetTrace(vehicleId) =>
- val records = passRecords.getOrElse(vehicleId, Queue.empty[PassRecord])
- sender ! VehicleTrace(records.toArray.sortBy(_.timeStamp))
- }
-
- private def getVelocity(passRecord: PassRecord, lastPassRecord: PassRecord): Float = {
- val distanceInKm = getDistance(lastPassRecord.locationId, passRecord.locationId)
- val timeInHour = (passRecord.timeStamp - lastPassRecord.timeStamp).toFloat / (1000 * 60 * 60)
- distanceInKm / timeInHour
- }
-
- private def getDistance(location1: String, location2: String): Long = {
- mockCity.getDistance(location1, location2)
- }
-}
-
-object VelocityInspector {
- final val OVER_DRIVE_THRESHOLD = "overdrive.threshold"
- final val FAKE_PLATE_THRESHOLD = "fakeplate.threshold"
- final val RECORDS_NUM = 20
-
- class FiniteQueue[T](q: Queue[T]) {
- def enqueueFinite[B >: T](elem: B, maxSize: Int): Queue[B] = {
- var result = q.enqueue(elem)
- while (result.size > maxSize) {
- result = result.dequeue._2
- }
- result
- }
- }
-
- import scala.language.implicitConversions
-
- implicit def queue2FiniteQueue[T](q: Queue[T]): FiniteQueue[T] = new FiniteQueue[T](q)
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCity.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCity.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCity.scala
deleted file mode 100644
index 60e0bcf..0000000
--- a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCity.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport.generator
-
-import scala.util.Random
-
-import org.apache.gearpump.streaming.examples.transport.generator.MockCity._
-
-class MockCity(size: Int) {
- private val random = new Random()
- private val directions = Array(UP, DOWN, LEFT, RIGHT)
-
- def nextLocation(currentLocationId: String): String = {
- val coordinate = idToCoordinate(currentLocationId)
- val direction = directions(random.nextInt(4))
- val newCoordinate = coordinate.addOffset(direction)
- if (inCity(newCoordinate)) {
- coordinateToId(newCoordinate)
- } else {
- nextLocation(currentLocationId)
- }
- }
-
- def getDistance(locationId1: String, locationId2: String): Long = {
- val coordinate1 = idToCoordinate(locationId1)
- val coordinate2 = idToCoordinate(locationId2)
- val blocks = Math.abs(coordinate1.row - coordinate2.row) +
- Math.abs(coordinate1.column - coordinate2.column)
- blocks * LENGTH_PER_BLOCK
- }
-
- def randomLocationId(): String = {
- val row = random.nextInt(size)
- val column = random.nextInt(size)
- coordinateToId(Coordinate(row, column))
- }
-
- private def coordinateToId(coordinate: Coordinate): String = {
- s"Id_${coordinate.row}_${coordinate.column}"
- }
-
- private def idToCoordinate(locationId: String): Coordinate = {
- val attr = locationId.split("_")
- val row = attr(1).toInt
- val column = attr(2).toInt
- Coordinate(row, column)
- }
-
- private def inCity(coordinate: Coordinate): Boolean = {
- coordinate.row >= 0 &&
- coordinate.row < size &&
- coordinate.column >= 0 &&
- coordinate.column < size
- }
-}
-
-object MockCity {
- // The length of the mock city, km
- final val LENGTH_PER_BLOCK = 5
- // The minimal speed, km/h
- final val MINIMAL_SPEED = 10
-
- final val UP = Coordinate(0, 1)
- final val DOWN = Coordinate(0, -1)
- final val LEFT = Coordinate(-1, 0)
- final val RIGHT = Coordinate(1, 0)
-
- case class Coordinate(row: Int, column: Int) {
- def addOffset(coordinate: Coordinate): Coordinate = {
- Coordinate(this.row + coordinate.row, this.column + coordinate.column)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala
deleted file mode 100644
index e8c1c59..0000000
--- a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport.generator
-
-import scala.util.Random
-
-import org.apache.gearpump.streaming.examples.transport.PassRecord
-import org.apache.gearpump.util.LogUtil
-
-class PassRecordGenerator(vehicleId: String, city: MockCity, overdriveThreshold: Int) {
- private val LOG = LogUtil.getLogger(getClass)
- LOG.info(s"Generate pass record for vehicle $vehicleId")
- private var timeStamp = System.currentTimeMillis()
-
- private var locationId = city.randomLocationId()
- private val random = new Random()
- private val fakePlate = random.nextInt(1000) < 1000 * PassRecordGenerator.FAKE_PLATE_RATE
- private val (randomMin, randomRange) = {
- val lowerBound = MockCity.LENGTH_PER_BLOCK * 1000 * 60 * 60 / overdriveThreshold.toFloat
- val upperBound = MockCity.LENGTH_PER_BLOCK * 1000 * 60 * 60 / MockCity.MINIMAL_SPEED.toFloat
- val overdrive = (upperBound - lowerBound) * PassRecordGenerator.OVERDRIVE_RATE
- val randomMin = Math.max(lowerBound - overdrive, PassRecordGenerator.TWOMINUTES)
- val randomRange = upperBound - randomMin
- (randomMin.toInt, randomRange.toInt)
- }
-
- def getNextPassRecord(): PassRecord = {
- locationId = if (fakePlate) {
- city.randomLocationId()
- } else {
- city.nextLocation(locationId)
- }
- timeStamp += (random.nextInt(randomRange) + randomMin)
- PassRecord(vehicleId, locationId, timeStamp)
- }
-}
-
-object PassRecordGenerator {
- final val FAKE_PLATE_RATE = 0.01F
- final val OVERDRIVE_RATE = 0.05F
- final val TWOMINUTES = 2 * 60 * 1000
-
- def create(generatorNum: Int, prefix: String, city: MockCity, overdriveThreshold: Int)
- : Array[PassRecordGenerator] = {
- var result = Map.empty[String, PassRecordGenerator]
- val digitsNum = (Math.log10(generatorNum) + 1).toInt
- for (i <- 1 to generatorNum) {
- val vehicleId = prefix + s"%0${digitsNum}d".format(i)
- val generator = new PassRecordGenerator(vehicleId, city, overdriveThreshold)
- result += vehicleId -> generator
- }
- result.values.toArray
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/DataSourceSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/DataSourceSpec.scala b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/DataSourceSpec.scala
deleted file mode 100644
index 1f525ae..0000000
--- a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/DataSourceSpec.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport
-
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalatest.{FlatSpec, Matchers}
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
-
-class DataSourceSpec extends FlatSpec with Matchers {
- it should "create the pass record" in {
- val vehicleNum = 2
- val context = MockUtil.mockTaskContext
-
- val userConfig = UserConfig.empty.withInt(DataSource.VEHICLE_NUM, vehicleNum).
- withInt(DataSource.MOCK_CITY_SIZE, 10).
- withInt(VelocityInspector.OVER_DRIVE_THRESHOLD, 60).
- withInt(VelocityInspector.FAKE_PLATE_THRESHOLD, 200)
-
- val source = new DataSource(context, userConfig)
- source.onStart(StartTime(0))
- source.onNext(Message("start"))
- verify(context, times(vehicleNum)).output(any[Message])
- source.onStop()
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/TransportSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/TransportSpec.scala b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/TransportSpec.scala
deleted file mode 100644
index 2f83de5..0000000
--- a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/TransportSpec.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport
-
-import scala.concurrent.Future
-import scala.util.Success
-
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{BeforeAndAfterAll, Matchers, PropSpec}
-
-import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication
-import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult
-import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
-
-class TransportSpec
- extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll with MasterHarness {
-
- override def beforeAll {
- startActorSystem()
- }
-
- override def afterAll {
- shutdownActorSystem()
- }
-
- protected override def config = TestUtil.DEFAULT_CONFIG
-
- property("Transport should succeed to submit application with required arguments") {
- val requiredArgs = Array.empty[String]
- val optionalArgs = Array(
- "-source", "1",
- "-inspector", "1",
- "-vehicle", "100",
- "-citysize", "10",
- "-threshold", "60")
-
- val args = {
- Table(
- ("requiredArgs", "optionalArgs"),
- (requiredArgs, optionalArgs)
- )
- }
- val masterReceiver = createMockMaster()
- forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) =>
- val args = requiredArgs ++ optionalArgs
-
- Future {
- Transport.main(masterConfig, args)
- }
- masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
- masterReceiver.reply(SubmitApplicationResult(Success(0)))
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCitySpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCitySpec.scala b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCitySpec.scala
deleted file mode 100644
index ba4eb2d..0000000
--- a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCitySpec.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport.generator
-
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-class MockCitySpec extends PropSpec with PropertyChecks with Matchers {
-
- property("MockCity should maintain the location properly") {
- val city = new MockCity(10)
- val start = city.randomLocationId()
- val nextLocation = city.nextLocation(start)
- assert(city.getDistance(start, nextLocation) == MockCity.LENGTH_PER_BLOCK)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala
deleted file mode 100644
index f0eebbf..0000000
--- a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport.generator
-
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-class PassRecordGeneratorSpec extends PropSpec with PropertyChecks with Matchers {
-
- property("PassRecordGenerator should generate pass record") {
- val id = "test"
- val city = new MockCity(10)
- val generator = new PassRecordGenerator(id, city, 60)
- val passrecord1 = generator.getNextPassRecord()
- val passrecord2 = generator.getNextPassRecord()
- assert(city.getDistance(passrecord1.locationId, passrecord2.locationId) ==
- MockCity.LENGTH_PER_BLOCK)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java
index 76069c1..0a8fb4f 100644
--- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java
+++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java
@@ -21,9 +21,10 @@ package org.apache.gearpump.streaming.examples.wordcountjava;
import org.apache.gearpump.Message;
import org.apache.gearpump.cluster.UserConfig;
import org.apache.gearpump.streaming.javaapi.Task;
-import org.apache.gearpump.streaming.task.StartTime;
import org.apache.gearpump.streaming.task.TaskContext;
+import java.time.Instant;
+
public class Split extends Task {
public static String TEXT = "This is a good start for java! bingo! bingo! ";
@@ -37,7 +38,7 @@ public class Split extends Task {
}
@Override
- public void onStart(StartTime startTime) {
+ public void onStart(Instant startTime) {
self().tell(new Message("start", now()), self());
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java
index 89c3b14..3daa6e0 100644
--- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java
+++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java
@@ -21,10 +21,10 @@ package org.apache.gearpump.streaming.examples.wordcountjava;
import org.apache.gearpump.Message;
import org.apache.gearpump.cluster.UserConfig;
import org.apache.gearpump.streaming.javaapi.Task;
-import org.apache.gearpump.streaming.task.StartTime;
import org.apache.gearpump.streaming.task.TaskContext;
import org.slf4j.Logger;
+import java.time.Instant;
import java.util.HashMap;
public class Sum extends Task {
@@ -37,7 +37,7 @@ public class Sum extends Task {
}
@Override
- public void onStart(StartTime startTime) {
+ public void onStart(Instant startTime) {
//skip
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
index ae63f10..af3c04c 100644
--- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
+++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
@@ -18,16 +18,17 @@
package org.apache.gearpump.streaming.examples.wordcount
+import java.time.Instant
import java.util.concurrent.TimeUnit
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
class Split(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
import taskContext.output
- override def onStart(startTime: StartTime): Unit = {
+ override def onStart(startTime: Instant): Unit = {
self ! Message("start")
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala
index c3fa82a..dbefc93 100644
--- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala
+++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala
@@ -18,6 +18,7 @@
package org.apache.gearpump.streaming.examples.wordcount
+import java.time.Instant
import java.util.concurrent.TimeUnit
import scala.collection.mutable
import scala.concurrent.duration.FiniteDuration
@@ -26,7 +27,7 @@ import akka.actor.Cancellable
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
class Sum(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
private[wordcount] val map: mutable.HashMap[String, Long] = new mutable.HashMap[String, Long]()
@@ -37,7 +38,7 @@ class Sum(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext,
private var scheduler: Cancellable = null
- override def onStart(startTime: StartTime): Unit = {
+ override def onStart(startTime: Instant): Unit = {
scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS),
new FiniteDuration(30, TimeUnit.SECONDS))(reportWordCount)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala
index cef9337..8b50890 100644
--- a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala
+++ b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala
@@ -17,6 +17,8 @@
*/
package org.apache.gearpump.streaming.examples.wordcount
+import java.time.Instant
+
import scala.concurrent.Await
import scala.concurrent.duration.Duration
@@ -29,7 +31,6 @@ import org.scalatest.{Matchers, WordSpec}
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.{TestUtil, UserConfig}
import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
class SplitSpec extends WordSpec with Matchers {
@@ -47,10 +48,10 @@ class SplitSpec extends WordSpec with Matchers {
val conf = UserConfig.empty
val split = new Split(taskContext, conf)
- split.onStart(StartTime(0))
+ split.onStart(Instant.EPOCH)
mockTaskActor.expectMsgType[Message]
- val expectedWordCount = Split.TEXT_TO_SPLIT.split( """[\s\n]+""").filter(_.nonEmpty).length
+ val expectedWordCount = Split.TEXT_TO_SPLIT.split( """[\s\n]+""").count(_.nonEmpty)
split.onNext(Message("next"))
verify(taskContext, times(expectedWordCount)).output(anyObject())
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SumSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SumSpec.scala b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SumSpec.scala
index e42d696..17e1765 100644
--- a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SumSpec.scala
+++ b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SumSpec.scala
@@ -17,6 +17,8 @@
*/
package org.apache.gearpump.streaming.examples.wordcount
+import java.time.Instant
+
import org.scalacheck.Gen
import org.scalatest.prop.PropertyChecks
import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
@@ -24,7 +26,6 @@ import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
class SumSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter {
val stringGenerator = Gen.alphaStr
@@ -39,7 +40,7 @@ class SumSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndA
val sum = new Sum(taskContext, conf)
- sum.onStart(StartTime(0))
+ sum.onStart(Instant.EPOCH)
forAll(stringGenerator) { txt =>
wordcount += 1
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala
index 1d3048e..e3b45fb 100644
--- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala
@@ -18,6 +18,7 @@
package org.apache.gearpump.experiments.storm.processor
+import java.time.Instant
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
@@ -46,7 +47,7 @@ private[storm] class StormProcessor(gearpumpBolt: GearpumpBolt,
private val freqOpt = gearpumpBolt.getTickFrequency
- override def onStart(startTime: StartTime): Unit = {
+ override def onStart(startTime: Instant): Unit = {
gearpumpBolt.start(startTime)
freqOpt.foreach(scheduleTick)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala
index 5d4a6a2..b92f037 100644
--- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala
@@ -18,6 +18,7 @@
package org.apache.gearpump.experiments.storm.producer
+import java.time.Instant
import java.util.concurrent.TimeUnit
import akka.actor.Actor.Receive
@@ -48,7 +49,7 @@ private[storm] class StormProducer(gearpumpSpout: GearpumpSpout,
private val timeoutMillis = gearpumpSpout.getMessageTimeout
- override def onStart(startTime: StartTime): Unit = {
+ override def onStart(startTime: Instant): Unit = {
gearpumpSpout.start(startTime)
if (gearpumpSpout.ackEnabled) {
getCheckpointClock
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
index d0f2949..a8e061c 100644
--- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
@@ -19,6 +19,7 @@
package org.apache.gearpump.experiments.storm.topology
import java.io.{File, FileOutputStream, IOException}
+import java.time.Instant
import java.util
import java.util.jar.JarFile
import java.util.{HashMap => JHashMap, List => JList, Map => JMap}
@@ -40,7 +41,7 @@ import org.apache.gearpump.experiments.storm.util.StormConstants._
import org.apache.gearpump.experiments.storm.util.StormUtil._
import org.apache.gearpump.experiments.storm.util.{StormOutputCollector, StormUtil}
import org.apache.gearpump.streaming.DAG
-import org.apache.gearpump.streaming.task.{GetDAG, TaskId, TaskContext, StartTime}
+import org.apache.gearpump.streaming.task.{GetDAG, TaskId, TaskContext}
import org.apache.gearpump.util.{Constants, LogUtil}
import org.apache.gearpump.{Message, TimeStamp}
import org.slf4j.Logger
@@ -57,7 +58,7 @@ trait GearpumpStormComponent {
* invoked at Task.onStart
* @param startTime task start time
*/
- def start(startTime: StartTime): Unit
+ def start(startTime: Instant): Unit
/**
* invoked at Task.onNext
@@ -123,7 +124,7 @@ object GearpumpStormComponent {
private var collector: StormSpoutOutputCollector = null
- override def start(startTime: StartTime): Unit = {
+ override def start(startTime: Instant): Unit = {
val dag = getDAG(taskContext.appMaster)
val topologyContext = getTopologyContext(dag, taskContext.taskId)
collector = getOutputCollector(taskContext, topologyContext)
@@ -206,7 +207,7 @@ object GearpumpStormComponent {
private var generalTopologyContext: GeneralTopologyContext = null
private var tickTuple: Tuple = null
- override def start(startTime: StartTime): Unit = {
+ override def start(startTime: Instant): Unit = {
val dag = getDAG(taskContext.appMaster)
topologyContext = getTopologyContext(dag, taskContext.taskId)
generalTopologyContext = getGeneralTopologyContext(dag)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormProcessorSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormProcessorSpec.scala
index 2111df6..9bbac58 100644
--- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormProcessorSpec.scala
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormProcessorSpec.scala
@@ -18,11 +18,12 @@
package org.apache.gearpump.experiments.storm.processor
+import java.time.Instant
+
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpBolt
import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
import org.mockito.Mockito._
import org.scalatest.mock.MockitoSugar
import org.scalatest.{Matchers, WordSpec}
@@ -31,7 +32,7 @@ class StormProcessorSpec extends WordSpec with Matchers with MockitoSugar {
"StormProcessor" should {
"start GearpumpSpout onStart" in {
- val startTime = mock[StartTime]
+ val startTime = Instant.EPOCH
val gearpumpBolt = mock[GearpumpBolt]
when(gearpumpBolt.getTickFrequency).thenReturn(None)
val taskContext = MockUtil.mockTaskContext
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormProducerSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormProducerSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormProducerSpec.scala
index 39a008f..ee89a4a 100644
--- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormProducerSpec.scala
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormProducerSpec.scala
@@ -18,12 +18,13 @@
package org.apache.gearpump.experiments.storm.producer
+import java.time.Instant
+
import akka.testkit.TestProbe
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpSpout
import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
import org.mockito.Mockito._
import org.scalatest.mock.MockitoSugar
import org.scalatest.{Matchers, WordSpec}
@@ -32,7 +33,7 @@ class StormProducerSpec extends WordSpec with Matchers with MockitoSugar {
"StormProducer" should {
"start GearpumpSpout onStart" in {
- val startTime = mock[StartTime]
+ val startTime = Instant.EPOCH
val gearpumpSpout = mock[GearpumpSpout]
when(gearpumpSpout.getMessageTimeout).thenReturn(None)
val taskContext = MockUtil.mockTaskContext
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
index bdea50c..0891070 100644
--- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
@@ -17,6 +17,7 @@
*/
package org.apache.gearpump.experiments.storm.topology
+import java.time.Instant
import java.util.{Map => JMap}
import akka.actor.ActorRef
@@ -26,7 +27,7 @@ import backtype.storm.tuple.Tuple
import org.apache.gearpump.experiments.storm.producer.StormSpoutOutputCollector
import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, GearpumpSpout}
import org.apache.gearpump.experiments.storm.util.StormOutputCollector
-import org.apache.gearpump.streaming.task.{StartTime, TaskContext, TaskId}
+import org.apache.gearpump.streaming.task.{TaskContext, TaskId}
import org.apache.gearpump.streaming.{DAG, MockUtil}
import org.apache.gearpump.{Message, TimeStamp}
import org.mockito.Matchers.{anyObject, eq => mockitoEq}
@@ -59,8 +60,7 @@ class GearpumpStormComponentSpec
getOutputCollector, ackEnabled = false, taskContext)
// Start
- val startTime = mock[StartTime]
- gearpumpSpout.start(startTime)
+ gearpumpSpout.start(Instant.EPOCH)
verify(spout).open(mockitoEq(config), mockitoEq(topologyContext),
anyObject[SpoutOutputCollector])
@@ -100,8 +100,7 @@ class GearpumpStormComponentSpec
getGeneralTopologyContext, getOutputCollector, getTickTuple, taskContext)
// Start
- val startTime = mock[StartTime]
- gearpumpBolt.start(startTime)
+ gearpumpBolt.start(Instant.EPOCH)
verify(bolt).prepare(mockitoEq(config), mockitoEq(topologyContext),
anyObject[OutputCollector])
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
index ef383ad..b92b2e1 100644
--- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
@@ -79,7 +79,6 @@ class GearpumpStormTopologySpec extends WordSpec with Matchers with MockitoSugar
"get target processors from source id" in {
val stormTopology = TopologyUtil.getTestTopology
implicit val system = MockUtil.system
- val sysConfig = new JHashMap[AnyRef, AnyRef]
val gearpumpStormTopology =
GearpumpStormTopology("app", stormTopology, null)
val targets0 = gearpumpStormTopology.getTargets("1")
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
index da08b04..314eae8 100644
--- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
@@ -18,6 +18,7 @@
package org.apache.gearpump.streaming.kafka.lib.source
+import java.time.Instant
import java.util.Properties
import com.twitter.bijection.Injection
@@ -87,11 +88,11 @@ abstract class AbstractKafkaSource(
this.checkpointStoreFactory = Some(checkpointStoreFactory)
}
- override def open(context: TaskContext, startTime: TimeStamp): Unit = {
+ override def open(context: TaskContext, startTime: Instant): Unit = {
import context.{parallelism, taskId}
LOG.info("KafkaSource opened at start time {}", startTime)
- this.startTime = startTime
+ this.startTime = startTime.toEpochMilli
val topicList = topic.split(",", -1).toList
val grouper = config.getConfiguredInstance(KafkaConfig.PARTITION_GROUPER_CLASS_CONFIG,
classOf[PartitionGrouper])
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala
index e40276f..6ccb231 100644
--- a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala
+++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala
@@ -18,6 +18,7 @@
package org.apache.gearpump.streaming.kafka
+import java.time.Instant
import java.util.Properties
import com.twitter.bijection.Injection
@@ -42,7 +43,7 @@ import org.scalatest.{Matchers, PropSpec}
class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
- val startTimeGen = Gen.choose[Long](0L, 100L)
+ val startTimeGen = Gen.choose[Long](0L, 100L).map(Instant.ofEpochMilli)
val offsetGen = Gen.choose[Long](0L, 100L)
val topicAndPartitionGen = for {
topic <- Gen.alphaStr suchThat (_.nonEmpty)
@@ -51,7 +52,7 @@ class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with Mo
val tpsGen = Gen.listOf[TopicAndPartition](topicAndPartitionGen) suchThat (_.nonEmpty)
property("KafkaSource open should not recover without checkpoint") {
- forAll(startTimeGen, tpsGen) { (startTime: Long, tps: List[TopicAndPartition]) =>
+ forAll(startTimeGen, tpsGen) { (startTime: Instant, tps: List[TopicAndPartition]) =>
val taskContext = MockUtil.mockTaskContext
val fetchThread = mock[FetchThread]
val kafkaClient = mock[KafkaClient]
@@ -84,7 +85,7 @@ class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with Mo
property("KafkaSource open should recover with checkpoint") {
forAll(startTimeGen, offsetGen, tpsGen) {
- (startTime: Long, offset: Long, tps: List[TopicAndPartition]) =>
+ (startTime: Instant, offset: Long, tps: List[TopicAndPartition]) =>
val taskContext = MockUtil.mockTaskContext
val checkpointStoreFactory = mock[CheckpointStoreFactory]
val checkpointStores = tps.map(_ -> mock[CheckpointStore]).toMap
@@ -115,7 +116,8 @@ class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with Mo
checkpointStores.foreach { case (tp, store) =>
when(checkpointStoreFactory.getCheckpointStore(
KafkaConfig.getCheckpointStoreNameSuffix(tp))).thenReturn(store)
- when(store.recover(startTime)).thenReturn(Some(Injection[Long, Array[Byte]](offset)))
+ when(store.recover(startTime.toEpochMilli))
+ .thenReturn(Some(Injection[Long, Array[Byte]](offset)))
}
source.setCheckpointStore(checkpointStoreFactory)