You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/04/26 11:42:49 UTC
[39/49] incubator-gearpump git commit: GEARPUMP-11, fix code style
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/main/Stock.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/main/Stock.scala b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/main/Stock.scala
index 91ba36e..638dc4e 100644
--- a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/main/Stock.scala
+++ b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/main/Stock.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,51 +16,60 @@
* limitations under the License.
*/
-
package io.gearpump.streaming.examples.stock.main
import akka.actor.ActorSystem
-import io.gearpump.streaming.{StreamApplication, Processor}
-import io.gearpump.streaming.examples.stock.{Crawler, QueryServer, Analyzer, StockMarket}
+import org.slf4j.Logger
+
import io.gearpump.cluster.UserConfig
import io.gearpump.cluster.client.ClientContext
import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
import io.gearpump.partitioner.HashPartitioner
-import StockMarket.ServiceHour
+import io.gearpump.streaming.examples.stock.StockMarket.ServiceHour
+import io.gearpump.streaming.examples.stock.{Analyzer, Crawler, QueryServer, StockMarket}
+import io.gearpump.streaming.{Processor, StreamApplication}
import io.gearpump.transport.HostPort
import io.gearpump.util.Graph.Node
import io.gearpump.util.{AkkaApp, Graph, LogUtil}
-import org.slf4j.Logger
+/** Tracks the China's stock market index change */
object Stock extends AkkaApp with ArgumentsParser {
private val LOG: Logger = LogUtil.getLogger(getClass)
override val options: Array[(String, CLIOption[Any])] = Array(
- "crawler"-> CLIOption[Int]("<how many fetcher to get data from remote>", required = false, defaultValue = Some(10)),
- "analyzer"-> CLIOption[Int]("<parallism of analyzer>", required = false, defaultValue = Some(1)),
- "proxy" -> CLIOption[String]("proxy setting host:port, for example: 127.0.0.1:8443", required = false, defaultValue = Some("")))
+ "crawler" -> CLIOption[Int]("<how many fetcher to get data from remote>",
+ required = false, defaultValue = Some(10)),
+ "analyzer" -> CLIOption[Int]("<parallism of analyzer>",
+ required = false, defaultValue = Some(1)),
+ "proxy" -> CLIOption[String]("proxy setting host:port, for example: 127.0.0.1:8443",
+ required = false, defaultValue = Some("")))
- def crawler(config: ParseResult)(implicit system: ActorSystem) : StreamApplication = {
+ def crawler(config: ParseResult)(implicit system: ActorSystem): StreamApplication = {
val crawler = Processor[Crawler](config.getInt("crawler"))
val analyzer = Processor[Analyzer](config.getInt("analyzer"))
val queryServer = Processor[QueryServer](1)
val proxySetting = config.getString("proxy")
- val proxy = if (proxySetting.isEmpty) {null } else HostPort(proxySetting)
+ val proxy = if (proxySetting.isEmpty) {
+ null
+ } else HostPort(proxySetting)
val stockMarket = new StockMarket(new ServiceHour(true), proxy)
val stocks = stockMarket.getStockIdList
+ // scalastyle:off println
Console.println(s"Successfully fetched stock id for ${stocks.length} stocks")
+ // scalastyle:on println
- val userConfig = UserConfig.empty.withValue("StockId", stocks).withValue[StockMarket](classOf[StockMarket].getName, stockMarket)
+ val userConfig = UserConfig.empty.withValue("StockId", stocks)
+ .withValue[StockMarket](classOf[StockMarket].getName, stockMarket)
val partitioner = new HashPartitioner
val p1 = crawler ~ partitioner ~> analyzer
val p2 = Node(queryServer)
val graph = Graph(p1, p2)
val app = StreamApplication("stock_direct_analyzer", graph, userConfig
- )
+ )
app
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/README.md
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/README.md b/examples/streaming/transport/README.md
new file mode 100644
index 0000000..fc9bdfe
--- /dev/null
+++ b/examples/streaming/transport/README.md
@@ -0,0 +1,3 @@
+What is this?
+=============
+A smart transportation example which simulate a city with millions of cars.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/main/resources/transport/css/custom.css
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/transport/css/custom.css b/examples/streaming/transport/src/main/resources/transport/css/custom.css
index 5b19ac7..f324b6a 100644
--- a/examples/streaming/transport/src/main/resources/transport/css/custom.css
+++ b/examples/streaming/transport/src/main/resources/transport/css/custom.css
@@ -1,16 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
.ui-datepicker {
font-size: 11px;
- }
+}
+
.sidebar-label {
- font-size:15px;
+ font-size: 15px;
font-family: calibri, Arial, Helvetica, sans-serif;
}
-
+
.help {
- font-size:12px;
+ font-size: 12px;
font-family: calibri, Arial, Helvetica, sans-serif;
}
-
+
div.splitter {
margin: 12px 0px 7px 0px;
clear: both;
@@ -18,13 +37,13 @@ div.splitter {
}
input.sidebar {
- width:165px
+ width: 165px
}
select.sidebar {
- width:198px
+ width: 198px
}
-
+
table.dataintable {
font-family: calibri, Arial, Helvetica, sans-serif;
font-size: 15px;
@@ -48,49 +67,49 @@ table.dataintable td {
border: 1px solid #AAA;
}
-#search{
- width:100px;
- height:25px;
- position:relative;
- left:0px;
- top:5px;
+#search {
+ width: 100px;
+ height: 25px;
+ position: relative;
+ left: 0px;
+ top: 5px;
}
-#mytable{
- width:100%;
- height:300;
- float:left;
+#mytable {
+ width: 100%;
+ height: 300;
+ float: left;
}
-#mychart{
- height:400px;
- width:100%;
+#mychart {
+ height: 400px;
+ width: 100%;
}
-#Menu{
- height:100%;
- width:245px;
- float:left;
+#Menu {
+ height: 100%;
+ width: 245px;
+ float: left;
}
-#header{
- height:115px;
+#header {
+ height: 115px;
background-image: url(header.png);
}
-#body{
- height:100%;
- width:100%;
+#body {
+ height: 100%;
+ width: 100%;
background-image: url(body.png);
- background-size:100% 100%;
+ background-size: 100% 100%;
}
-#footer{
- color:white;
- height:70px;
- line-height:70px;
- text-align:middle;
- clear:both;
- text-align:center;
+#footer {
+ color: white;
+ height: 70px;
+ line-height: 70px;
+ text-align: middle;
+ clear: both;
+ text-align: center;
background-image: url(foot.png);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/main/resources/transport/js/transport.js
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/transport/js/transport.js b/examples/streaming/transport/src/main/resources/transport/js/transport.js
index 35d9ea2..eef0fe9 100644
--- a/examples/streaming/transport/src/main/resources/transport/js/transport.js
+++ b/examples/streaming/transport/src/main/resources/transport/js/transport.js
@@ -1,162 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
var myChart = echarts.init(document.getElementById("mychart"))
echarts.util.mapData.params.params.football = {
- getGeoJson: function (callback) {
- $.ajax({
- url: "../svg/beijing.svg",
- dataType: 'xml',
- success: function(xml) {
- callback(xml)
- }
- });
- }
+ getGeoJson: function (callback) {
+ $.ajax({
+ url: "../svg/beijing.svg",
+ dataType: 'xml',
+ success: function (xml) {
+ callback(xml)
+ }
+ });
+ }
}
function updateRecords(tableId) {
- $.getJSON( "records", function( json ) {
- var tableStr = "<table class=\"dataintable\" style=\"margin-left: 5px;\">";
- tableStr += "<tr><th>Over Speed Vehicle ID</th><th>Speed</th><th>Location</th><th>Time</th></tr>";
- var records = json.records;
- for(var i = 0; i < Math.min(records.length, 20); i++) {
- var record = records[i];
- var vehicleId = record.vehicleId;
- var location = record.locationId.split("_");
- var speed = record.speed;
- var row = location[1];
- var column = location[2];
- var time = new Date(Number(record.timestamp)).toLocaleTimeString().replace(/^\D*/,'');
- tableStr += "<tr><td>" + vehicleId + "</td>";
- tableStr += "<td>" + speed + "km/h </td>"
- tableStr += "<td>(" + row + ", "+ column + ")</td>";
- tableStr += "<td>" + time + "</td></tr>";
- }
- if(records.length < 20) {
- for(var i = records.length; i < 20; i++) {
+ $.getJSON("records", function (json) {
+ var tableStr = "<table class=\"dataintable\" style=\"margin-left: 5px;\">";
+ tableStr += "<tr><th>Over Speed Vehicle ID</th><th>Speed</th><th>Location</th><th>Time</th></tr>";
+ var records = json.records;
+ for (var i = 0; i < Math.min(records.length, 20); i++) {
+ var record = records[i];
+ var vehicleId = record.vehicleId;
+ var location = record.locationId.split("_");
+ var speed = record.speed;
+ var row = location[1];
+ var column = location[2];
+ var time = new Date(Number(record.timestamp)).toLocaleTimeString().replace(/^\D*/, '');
+ tableStr += "<tr><td>" + vehicleId + "</td>";
+ tableStr += "<td>" + speed + "km/h </td>"
+ tableStr += "<td>(" + row + ", " + column + ")</td>";
+ tableStr += "<td>" + time + "</td></tr>";
+ }
+ if (records.length < 20) {
+ for (var i = records.length; i < 20; i++) {
tableStr += "<tr><td></td>";
tableStr += "<td> </td>"
tableStr += "<td> </td>";
tableStr += "<td> </td></tr>";
- }
- }
- tableStr += "</table>"
- document.getElementById(tableId).innerHTML = tableStr;
- }
- )
+ }
+ }
+ tableStr += "</table>"
+ document.getElementById(tableId).innerHTML = tableStr;
+ }
+ )
}
function initChart(chartid, vehicleId) {
- // 基于准备好的dom,初始化echarts图表
- $.getJSON( "trace/" + vehicleId, function( json ) {
- // 为echarts对象加载数据
- var records = json.records;
- var timeLine = new Array(records.length);
- var markPoints = new Array(records.length);
- var options_ = new Array(records.length - 2);
- for(var i = 0; i < records.length; i++) {
- var record = records[i];
- var vehicleId = record.vehicleId;
- var location = record.locationId.split("_");
- var row = location[1];
- var column = location[2];
- var time = new Date(Number(record.timeStamp)).toLocaleTimeString().replace(/^\D*/,'');
- timeLine[i] = time;
- var currentPonit = {name: "", value: i, geoCoord:[row * 90, column * 90]};
- markPoints[i] = currentPonit;
+ // 基于准备好的dom,初始化echarts图表
+ $.getJSON("trace/" + vehicleId, function (json) {
+ // 为echarts对象加载数据
+ var records = json.records;
+ var timeLine = new Array(records.length);
+ var markPoints = new Array(records.length);
+ var options_ = new Array(records.length - 2);
+ for (var i = 0; i < records.length; i++) {
+ var record = records[i];
+ var vehicleId = record.vehicleId;
+ var location = record.locationId.split("_");
+ var row = location[1];
+ var column = location[2];
+ var time = new Date(Number(record.timeStamp)).toLocaleTimeString().replace(/^\D*/, '');
+ timeLine[i] = time;
+ var currentPonit = {name: "", value: i, geoCoord: [row * 90, column * 90]};
+ markPoints[i] = currentPonit;
+ }
+ options_[0] =
+ {
+ title: {
+ text: 'Vehicle trace'
+ },
+ tooltip: {
+ trigger: 'item'
+ },
+ toolbox: {
+ show: false,
+ feature: {
+ mark: {show: true},
+ dataView: {show: true, readOnly: false},
+ magicType: {show: true, type: ['line', 'bar']},
+ restore: {show: true},
+ saveAsImage: {show: true}
}
- options_[0] =
- {
- title : {
- text: 'Vehicle trace'
- },
- tooltip : {
- trigger: 'item'
- },
- toolbox: {
- show : false,
- feature : {
- mark : {show: true},
- dataView : {show: true, readOnly: false},
- magicType : {show: true, type: ['line', 'bar']},
- restore : {show: true},
- saveAsImage : {show: true}
+ },
+ series: [
+ {
+ name: 'Vehicle trace',
+ type: 'map',
+ mapType: 'football',
+ mapLocation: {
+ y: 30,
+ height: 430
+ },
+ itemStyle: {
+ normal: {label: {show: false}},
+ emphasis: {label: {show: false}}
+ },
+ data: [
+ {name: 'City', hoverable: false, itemStyle: {normal: {label: {show: false}}}}
+ ],
+ markPoint: {
+ symbol: 'circle',
+ symbolSize: 8,
+ itemStyle: {
+ normal: {
+ borderWidth: 1,
+ color: 'blue',
+ lineStyle: {
+ type: 'solid'
+ }
}
},
- series : [
- {
- name: 'Vehicle trace',
- type: 'map',
- mapType: 'football',
- mapLocation:{
- y: 30,
- height: 430
- },
- itemStyle:{
- normal:{label:{show:false}},
- emphasis:{label:{show:false}}
- },
- data:[
- {name: 'City', hoverable: false, itemStyle:{normal:{label:{show:false}}}}
- ],
- markPoint : {
- symbol:'circle',
- symbolSize : 8,
- itemStyle : {
- normal: {
- borderWidth:1,
- color: 'blue',
- lineStyle: {
- type: 'solid'
- }
- }
- },
- data: [markPoints[0]]
- },
- markLine : {
- smooth:true,
- effect : {
- show: true,
- scaleSize: 1.5,
- period: 1.5,
- color: '#fff'
- },
- itemStyle : {
- normal: {
- borderWidth:2,
- color: 'red',
- lineStyle: {
- type: 'solid'
- }
- }
- },
- data: []
+ data: [markPoints[0]]
+ },
+ markLine: {
+ smooth: true,
+ effect: {
+ show: true,
+ scaleSize: 1.5,
+ period: 1.5,
+ color: '#fff'
+ },
+ itemStyle: {
+ normal: {
+ borderWidth: 2,
+ color: 'red',
+ lineStyle: {
+ type: 'solid'
}
}
- ]
+ },
+ data: []
}
- for(var i = 1; i < markPoints.length; i++){
- options_[i] =
+ }
+ ]
+ }
+ for (var i = 1; i < markPoints.length; i++) {
+ options_[i] =
+ {
+ series: [
{
- series: [
- {
- markPoint : {
- data: [markPoints[i]]
- },
- markLine : {
- data: []
- }
+ markPoint: {
+ data: [markPoints[i]]
+ },
+ markLine: {
+ data: []
}
- ]
}
- }
- var option = {
- timeline : {
- type: 'number',
- playInterval:500,
- autoPlay:true,
- data: timeLine
- },
- options: options_
- };
- myChart.setOption(option);
- });
+ ]
+ }
+ }
+ var option = {
+ timeline: {
+ type: 'number',
+ playInterval: 500,
+ autoPlay: true,
+ data: timeLine
+ },
+ options: options_
+ };
+ myChart.setOption(option);
+ });
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/main/resources/transport/transport.html
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/transport/transport.html b/examples/streaming/transport/src/main/resources/transport/transport.html
index fc17707..baee931 100644
--- a/examples/streaming/transport/src/main/resources/transport/transport.html
+++ b/examples/streaming/transport/src/main/resources/transport/transport.html
@@ -1,67 +1,88 @@
<!DOCTYPE html>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
<html>
<head>
- <meta charset="utf-8">
- <link rel=stylesheet type=text/css href="css/custom.css">
- <script src="http://echarts.baidu.com/build/source/echarts-all.js"></script>
- <script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js"></script>
+ <meta charset="utf-8">
+ <link rel=stylesheet type=text/css href="css/custom.css">
+ <script src="http://echarts.baidu.com/build/source/echarts-all.js"></script>
+ <script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js"></script>
</head>
<body style="background-color:#F2F2F2">
<div id="container" style="width:882px; height:450px;margin-left:auto;margin-right:auto;">
- <div style="height:0px"></div>
- <div id="header">
- <div style="font-weight:600;position:relative;left:50px;top:50px;font-family: calibri, Arial, Helvetica, sans-serif;font-size:29px;color:white">
- Big Data Transport Monitoring Demo
- </div>
+ <div style="height:0px"></div>
+ <div id="header">
+ <div
+ style="font-weight:600;position:relative;left:50px;top:50px;font-family: calibri, Arial, Helvetica, sans-serif;font-size:29px;color:white">
+ Big Data Transport Monitoring Demo
</div>
- <div id="body">
- <div id="Menu">
- <div style="position:relative;margin-left:30px; margin-right:20px;margin-top:20px;">
- <!-- form to post to accompany to get accompanying cars -->
+ </div>
+ <div id="body">
+ <div id="Menu">
+ <div style="position:relative;margin-left:30px; margin-right:20px;margin-top:20px;">
+ <!-- form to post to accompany to get accompanying cars -->
- <table style="width:100%">
- <tr>
- <td class="sidebar-label">Vehicle Id:</td>
- </tr>
- <tr>
- <td class="sidebar-label"></td>
- </tr>
- <tr>
- <td style="vertical-align:top;">
- <input id="vehicleId" class="sidebar" type="text" name="vehicleId"/>
- </td>
- </tr>
- </table>
- <div class="splitter"></div>
- <div>
- <button id="search" onclick="search_onclick()">Search</button>
- </div>
- </div>
+ <table style="width:100%">
+ <tr>
+ <td class="sidebar-label">Vehicle Id:</td>
+ </tr>
+ <tr>
+ <td class="sidebar-label"></td>
+ </tr>
+ <tr>
+ <td style="vertical-align:top;">
+ <input id="vehicleId" class="sidebar" type="text" name="vehicleId"/>
+ </td>
+ </tr>
+ </table>
+ <div class="splitter"></div>
+ <div>
+ <button id="search" onclick="search_onclick()">Search</button>
</div>
- <div id="content" style="height:100%;width:585px;float:left;position:relative;left:20px;overflow:scroll;">
- <div style="height:50px;position:relative;top:15px;vertical-align:middle;font-weight:300;font-family: calibri, Arial, Helvetica, sans-serif;font-size:22px;color:black">
- Vehicle Trace:
- </div>
- <div style="height:7px;background-color:#92BDF2;"></div>
+ </div>
+ </div>
+ <div id="content"
+ style="height:100%;width:585px;float:left;position:relative;left:20px;overflow:scroll;">
+ <div
+ style="height:50px;position:relative;top:15px;vertical-align:middle;font-weight:300;font-family: calibri, Arial, Helvetica, sans-serif;font-size:22px;color:black">
+ Vehicle Trace:
+ </div>
+ <div style="height:7px;background-color:#92BDF2;"></div>
- <div id="mychart"></div>
+ <div id="mychart"></div>
- <div id="mytable"></div>
- </div>
+ <div id="mytable"></div>
</div>
- <div id="footer">
- Big Data Team @ Intel
- </div>
- <script src="js/transport.js"></script>
- <script type="text/javascript">
- function search_onclick(){
- var vehicleId = document.getElementById('vehicleId').value
- initChart("mychart", vehicleId)
- }
- setInterval(updateRecords, 1000, "mytable")
- </script>
+ </div>
+ <div id="footer">
+ Big Data Team @ Intel
+ </div>
+ <script src="js/transport.js"></script>
+ <script type="text/javascript">
+ function search_onclick() {
+ var vehicleId = document.getElementById('vehicleId').value
+ initChart("mychart", vehicleId)
+ }
+ setInterval(updateRecords, 1000, "mytable")
+ </script>
</div>
</body>
</html>
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Data.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Data.scala b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Data.scala
index 7d3f35a..788f92a 100644
--- a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Data.scala
+++ b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Data.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,9 +19,11 @@ package io.gearpump.streaming.examples.transport
case class LocationInfo(id: String, row: Int, column: Int)
+// scalastyle:off equals.hash.code
case class PassRecord(vehicleId: String, locationId: String, timeStamp: Long) {
override def hashCode: Int = vehicleId.hashCode
}
+// scalastyle:on equals.hash.code
case class GetTrace(vehicleId: String)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/DataSource.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/DataSource.scala b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/DataSource.scala
index 1c280f1..33d7b54 100644
--- a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/DataSource.scala
+++ b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/DataSource.scala
@@ -1,55 +1,56 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.transport
-
-import io.gearpump.streaming.examples.transport.generator.{MockCity, PassRecordGenerator}
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import scala.concurrent.duration._
-
-class DataSource(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf){
- import taskContext.{output, parallelism, taskId, scheduleOnce}
-
- import system.dispatcher
- private val overdriveThreshold = conf.getInt(VelocityInspector.OVER_DRIVE_THRESHOLD).get
- private val vehicleNum = conf.getInt(DataSource.VEHICLE_NUM).get / parallelism
- private val citySize = conf.getInt(DataSource.MOCK_CITY_SIZE).get
- private val mockCity = new MockCity(citySize)
- private val recordGenerators: Array[PassRecordGenerator] =
- PassRecordGenerator.create(vehicleNum, getIdentifier(taskId), mockCity, overdriveThreshold)
-
- override def onStart(startTime: StartTime): Unit = {
- self ! Message("start", System.currentTimeMillis())
- }
-
- override def onNext(msg: Message): Unit = {
- recordGenerators.foreach(generator =>
- output(Message(generator.getNextPassRecord(), System.currentTimeMillis())))
- scheduleOnce(1 second)(self ! Message("continue", System.currentTimeMillis()))
- }
-
- private def getIdentifier(taskId: TaskId): String = {
- s"沪A${taskId.processorId}${taskId.index}"
- }
-}
-
-object DataSource {
- final val VEHICLE_NUM = "vehicle.number"
- final val MOCK_CITY_SIZE = "mock.city.size"
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.gearpump.streaming.examples.transport
+
+import scala.concurrent.duration._
+
+import io.gearpump.Message
+import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.examples.transport.generator.{MockCity, PassRecordGenerator}
+import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
+
+class DataSource(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
+ import taskContext.{output, parallelism, scheduleOnce, taskId}
+ private val overdriveThreshold = conf.getInt(VelocityInspector.OVER_DRIVE_THRESHOLD).get
+ private val vehicleNum = conf.getInt(DataSource.VEHICLE_NUM).get / parallelism
+ private val citySize = conf.getInt(DataSource.MOCK_CITY_SIZE).get
+ private val mockCity = new MockCity(citySize)
+ private val recordGenerators: Array[PassRecordGenerator] =
+ PassRecordGenerator.create(vehicleNum, getIdentifier(taskId), mockCity, overdriveThreshold)
+
+ override def onStart(startTime: StartTime): Unit = {
+ self ! Message("start", System.currentTimeMillis())
+ }
+
+ override def onNext(msg: Message): Unit = {
+ recordGenerators.foreach(generator =>
+ output(Message(generator.getNextPassRecord(), System.currentTimeMillis())))
+ scheduleOnce(1.second)(self ! Message("continue", System.currentTimeMillis()))
+ }
+
+ private def getIdentifier(taskId: TaskId): String = {
+ // scalastyle:off non.ascii.character.disallowed
+ s"沪A${taskId.processorId}${taskId.index}"
+ // scalastyle:on non.ascii.character.disallowed
+ }
+}
+
+object DataSource {
+ final val VEHICLE_NUM = "vehicle.number"
+ final val MOCK_CITY_SIZE = "mock.city.size"
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/QueryServer.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/QueryServer.scala b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/QueryServer.scala
index 5f91883..f9dbbde 100644
--- a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/QueryServer.scala
+++ b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/QueryServer.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,30 +18,29 @@
package io.gearpump.streaming.examples.transport
import java.util.concurrent.TimeUnit
+import scala.concurrent.Future
+import scala.util.{Failure, Success}
import akka.actor.Actor._
-import akka.actor.{Actor, Props}
+import akka.actor.{Actor, ActorRefFactory, Props}
import akka.io.IO
import akka.pattern.ask
+import spray.can.Http
+import spray.http.StatusCodes
+import spray.json._
+import spray.routing.{HttpService, Route}
+import upickle.default.write
+
import io.gearpump.Message
import io.gearpump.cluster.UserConfig
import io.gearpump.partitioner.PartitionerDescription
-import io.gearpump.streaming.appmaster.AppMaster
import io.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef}
import io.gearpump.streaming.examples.transport.QueryServer.{GetAllRecords, WebServer}
import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
import io.gearpump.streaming.{DAG, ProcessorDescription, ProcessorId, StreamApplication}
import io.gearpump.util.Graph
-import spray.can.Http
-import spray.http.StatusCodes
-import spray.json._
-import spray.routing.HttpService
-import upickle.default.write
-
-import scala.concurrent.Future
-import scala.util.{Failure, Success}
-class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf){
+class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
import system.dispatcher
import taskContext.appMaster
@@ -50,7 +49,8 @@ class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskC
private var overSpeedRecords = List.empty[OverSpeedReport]
override def onStart(startTime: StartTime): Unit = {
- val dag = DAG(conf.getValue[Graph[ProcessorDescription, PartitionerDescription]](StreamApplication.DAG).get)
+ val dag = DAG(conf.getValue[Graph[ProcessorDescription, PartitionerDescription]](
+ StreamApplication.DAG).get)
inspector = dag.processors.find { kv =>
val (_, processor) = kv
processor.taskClass == classOf[VelocityInspector].getName
@@ -62,7 +62,7 @@ class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskC
}
override def receiveUnManagedMessage: Receive = {
- case getTrace @ GetTrace(vehicleId: String) =>
+ case getTrace@GetTrace(vehicleId: String) =>
val parallism = inspector._2.parallelism
val processorId = inspector._1
val analyzerTaskId = TaskId(processorId, (vehicleId.hashCode & Integer.MAX_VALUE) % parallism)
@@ -74,14 +74,14 @@ class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskC
LOG.info(s"reporting $trace")
requester ! trace
}
- case record@ OverSpeedReport(vehicleId, speed, timestamp, locationId) =>
+ case record@OverSpeedReport(vehicleId, speed, timestamp, locationId) =>
LOG.info(s"vehicle $vehicleId is over speed, the speed is $speed km/h")
overSpeedRecords :+= record
case GetAllRecords =>
sender ! QueryServer.OverSpeedRecords(overSpeedRecords.toArray.sortBy(_.timestamp))
overSpeedRecords = List.empty[OverSpeedReport]
case _ =>
- //ignore
+ // Ignore
}
}
@@ -94,21 +94,22 @@ object QueryServer {
import context.dispatcher
implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
- def actorRefFactory = context
+ def actorRefFactory: ActorRefFactory = context
implicit val system = context.system
IO(Http) ! Http.Bind(self, interface = "0.0.0.0", port = 8080)
override def receive: Receive = runRoute(webServer ~ staticRoute)
- def webServer = {
- path("trace" / PathElement) { vehicleId =>
+ def webServer: Route = {
+ path("trace" / Segment) { vehicleId =>
get {
onComplete((context.parent ? GetTrace(vehicleId)).asInstanceOf[Future[VehicleTrace]]) {
case Success(trace: VehicleTrace) =>
val json = write(trace)
complete(pretty(json))
- case Failure(ex) => complete(StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}")
+ case Failure(ex) => complete(StatusCodes.InternalServerError,
+ s"An error occurred: ${ex.getMessage}")
}
}
} ~
@@ -118,7 +119,8 @@ object QueryServer {
case Success(records: OverSpeedRecords) =>
val json = write(records)
complete(pretty(json))
- case Failure(ex) => complete(StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}")
+ case Failure(ex) => complete(StatusCodes.InternalServerError,
+ s"An error occurred: ${ex.getMessage}")
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Transport.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Transport.scala b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Transport.scala
index fb3b7c1..a795277 100644
--- a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Transport.scala
+++ b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/Transport.scala
@@ -1,62 +1,69 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.transport
-
-import io.gearpump.streaming.{StreamApplication, Processor}
-import io.gearpump.cluster.UserConfig
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
-import io.gearpump.partitioner.HashPartitioner
-import io.gearpump.util.Graph._
-import io.gearpump.util.{AkkaApp, Graph}
-
-object Transport extends AkkaApp with ArgumentsParser {
- override val options: Array[(String, CLIOption[Any])] = Array(
- "source"-> CLIOption[Int]("<how many task to generate data>", required = false, defaultValue = Some(10)),
- "inspector"-> CLIOption[Int]("<how many over speed inspector>", required = false, defaultValue = Some(4)),
- "vehicle"-> CLIOption[Int]("<how many vehicles's to generate>", required = false, defaultValue = Some(1000)),
- "citysize"-> CLIOption[Int]("<the blocks number of the mock city>", required = false, defaultValue = Some(10)),
- "threshold"-> CLIOption[Int]("<overdrive threshold, km/h>", required = false, defaultValue = Some(60)))
-
- def application(config: ParseResult): StreamApplication = {
- val sourceNum = config.getInt("source")
- val inspectorNum = config.getInt("inspector")
- val vehicleNum = config.getInt("vehicle")
- val citysize = config.getInt("citysize")
- val threshold = config.getInt("threshold")
- val source = Processor[DataSource](sourceNum)
- val inspector = Processor[VelocityInspector](inspectorNum)
- val queryServer = Processor[QueryServer](1)
- val partitioner = new HashPartitioner
-
- val userConfig = UserConfig.empty.withInt(DataSource.VEHICLE_NUM, vehicleNum).
- withInt(DataSource.MOCK_CITY_SIZE, citysize).
- withInt(VelocityInspector.OVER_DRIVE_THRESHOLD, threshold).
- withInt(VelocityInspector.FAKE_PLATE_THRESHOLD, 200)
- StreamApplication("transport", Graph(source ~ partitioner ~> inspector, Node(queryServer)), userConfig)
- }
-
- override def main(akkaConf: Config, args: Array[String]): Unit = {
- val config = parse(args)
- val context = ClientContext(akkaConf)
- implicit val system = context.system
- context.submit(application(config))
- context.close()
- }
-}
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.gearpump.streaming.examples.transport
+
+import io.gearpump.cluster.UserConfig
+import io.gearpump.cluster.client.ClientContext
+import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
+import io.gearpump.partitioner.HashPartitioner
+import io.gearpump.streaming.{Processor, StreamApplication}
+import io.gearpump.util.Graph._
+import io.gearpump.util.{AkkaApp, Graph}
+
+/** A city smart transportation streaming application */
+object Transport extends AkkaApp with ArgumentsParser {
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ "source" -> CLIOption[Int]("<how many task to generate data>", required = false,
+ defaultValue = Some(10)),
+ "inspector" -> CLIOption[Int]("<how many over speed inspector>", required = false,
+ defaultValue = Some(4)),
+ "vehicle" -> CLIOption[Int]("<how many vehicles's to generate>", required = false,
+ defaultValue = Some(1000)),
+ "citysize" -> CLIOption[Int]("<the blocks number of the mock city>", required = false,
+ defaultValue = Some(10)),
+ "threshold" -> CLIOption[Int]("<overdrive threshold, km/h>", required = false,
+ defaultValue = Some(60)))
+
+ def application(config: ParseResult): StreamApplication = {
+ val sourceNum = config.getInt("source")
+ val inspectorNum = config.getInt("inspector")
+ val vehicleNum = config.getInt("vehicle")
+ val citysize = config.getInt("citysize")
+ val threshold = config.getInt("threshold")
+ val source = Processor[DataSource](sourceNum)
+ val inspector = Processor[VelocityInspector](inspectorNum)
+ val queryServer = Processor[QueryServer](1)
+ val partitioner = new HashPartitioner
+
+ val userConfig = UserConfig.empty.withInt(DataSource.VEHICLE_NUM, vehicleNum).
+ withInt(DataSource.MOCK_CITY_SIZE, citysize).
+ withInt(VelocityInspector.OVER_DRIVE_THRESHOLD, threshold).
+ withInt(VelocityInspector.FAKE_PLATE_THRESHOLD, 200)
+ StreamApplication("transport", Graph(source ~ partitioner ~> inspector,
+ Node(queryServer)), userConfig)
+ }
+
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ val config = parse(args)
+ val context = ClientContext(akkaConf)
+ implicit val system = context.system
+ context.submit(application(config))
+ context.close()
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/VelocityInspector.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/VelocityInspector.scala b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/VelocityInspector.scala
index 5265d45..b9be8d7 100644
--- a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/VelocityInspector.scala
+++ b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/VelocityInspector.scala
@@ -1,116 +1,123 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.transport
-
-import java.util.concurrent.TimeUnit
-
-import akka.actor.Actor._
-import akka.actor.ActorRef
-import akka.pattern.ask
-import io.gearpump.streaming.{StreamApplication, ProcessorDescription, DAG}
-import io.gearpump.streaming.appmaster.AppMaster
-import io.gearpump.streaming.examples.transport.generator.MockCity
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.partitioner.PartitionerDescription
-import AppMaster.{TaskActorRef, LookupTaskActorRef}
-import io.gearpump.util.Graph
-
-import scala.collection.immutable.Queue
-import scala.collection.mutable
-import scala.concurrent.Future
-
-class VelocityInspector(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
- import taskContext.appMaster
- import system.dispatcher
- implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
- private val passRecords = mutable.Map.empty[String, Queue[PassRecord]]
- private val fakePlateThreshold = conf.getInt(VelocityInspector.FAKE_PLATE_THRESHOLD).get
- private val overdriveThreshold = conf.getInt(VelocityInspector.OVER_DRIVE_THRESHOLD).get
- private val citySize = conf.getInt(DataSource.MOCK_CITY_SIZE).get
- private val mockCity = new MockCity(citySize)
- private var queryServerActor: ActorRef = null
-
- override def onStart(startTime: StartTime): Unit = {
- val dag = DAG(conf.getValue[Graph[ProcessorDescription, PartitionerDescription]](StreamApplication.DAG).get)
- val queryServer = dag.processors.find { kv =>
- val (_, processor) = kv
- processor.taskClass == classOf[QueryServer].getName
- }.get
- val queryServerTaskId = TaskId(queryServer._1, 0)
- (appMaster ? LookupTaskActorRef(queryServerTaskId)).asInstanceOf[Future[TaskActorRef]].map {task =>
- queryServerActor = task.task
- }
- }
-
- import VelocityInspector._
- override def onNext(msg: Message): Unit = {
- msg.msg match {
- case passRecord: PassRecord =>
- val records = passRecords.getOrElse(passRecord.vehicleId, Queue.empty[PassRecord])
- if(records.size > 0) {
- val velocity = getVelocity(passRecord, records.last)
- val formatted = "%.2f".format(velocity)
- if(velocity > overdriveThreshold) {
- if(velocity > fakePlateThreshold) {
- LOG.info(s"vehicle ${passRecord.vehicleId} maybe a fake plate, the speed is $formatted km/h")
- }
- if(queryServerActor != null) {
- queryServerActor ! OverSpeedReport(passRecord.vehicleId, formatted, passRecord.timeStamp, passRecord.locationId)
- }
- }
- }
- passRecords.update(passRecord.vehicleId, records.enqueueFinite(passRecord, RECORDS_NUM))
- }
- }
-
- override def receiveUnManagedMessage: Receive = {
- case GetTrace(vehicleId) =>
- val records = passRecords.getOrElse(vehicleId, Queue.empty[PassRecord])
- sender ! VehicleTrace(records.toArray.sortBy(_.timeStamp))
- }
-
- private def getVelocity(passRecord: PassRecord, lastPassRecord: PassRecord): Float = {
- val distanceInKm = getDistance(lastPassRecord.locationId, passRecord.locationId)
- val timeInHour = (passRecord.timeStamp - lastPassRecord.timeStamp).toFloat / (1000 * 60 * 60)
- distanceInKm / timeInHour
- }
-
- private def getDistance(location1: String, location2: String): Long = {
- mockCity.getDistance(location1, location2)
- }
-}
-
-object VelocityInspector{
- final val OVER_DRIVE_THRESHOLD = "overdrive.threshold"
- final val FAKE_PLATE_THRESHOLD = "fakeplate.threshold"
- final val RECORDS_NUM = 20
-
- class FiniteQueue[T](q: Queue[T]) {
- def enqueueFinite[B >: T](elem: B, maxSize: Int): Queue[B] = {
- var result = q.enqueue(elem)
- while (result.size > maxSize) {
- result = result.dequeue._2
- }
- result
- }
- }
-
- implicit def queue2FiniteQueue[T](q: Queue[T]): FiniteQueue[T] = new FiniteQueue[T](q)
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.gearpump.streaming.examples.transport
+
+import java.util.concurrent.TimeUnit
+import scala.collection.immutable.Queue
+import scala.collection.mutable
+import scala.concurrent.Future
+
+import akka.actor.Actor._
+import akka.actor.ActorRef
+import akka.pattern.ask
+
+import io.gearpump.Message
+import io.gearpump.cluster.UserConfig
+import io.gearpump.partitioner.PartitionerDescription
+import io.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef}
+import io.gearpump.streaming.examples.transport.generator.MockCity
+import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
+import io.gearpump.streaming.{DAG, ProcessorDescription, StreamApplication}
+import io.gearpump.util.Graph
+
+class VelocityInspector(taskContext: TaskContext, conf: UserConfig)
+ extends Task(taskContext, conf) {
+
+ import system.dispatcher
+ import taskContext.appMaster
+ implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
+ private val passRecords = mutable.Map.empty[String, Queue[PassRecord]]
+ private val fakePlateThreshold = conf.getInt(VelocityInspector.FAKE_PLATE_THRESHOLD).get
+ private val overdriveThreshold = conf.getInt(VelocityInspector.OVER_DRIVE_THRESHOLD).get
+ private val citySize = conf.getInt(DataSource.MOCK_CITY_SIZE).get
+ private val mockCity = new MockCity(citySize)
+ private var queryServerActor: ActorRef = null
+
+ override def onStart(startTime: StartTime): Unit = {
+ val dag = DAG(conf.getValue[Graph[ProcessorDescription, PartitionerDescription]](
+ StreamApplication.DAG).get)
+ val queryServer = dag.processors.find { kv =>
+ val (_, processor) = kv
+ processor.taskClass == classOf[QueryServer].getName
+ }.get
+ val queryServerTaskId = TaskId(queryServer._1, 0)
+ (appMaster ? LookupTaskActorRef(queryServerTaskId)).asInstanceOf[Future[TaskActorRef]]
+ .map { task =>
+ queryServerActor = task.task
+ }
+ }
+
+ import io.gearpump.streaming.examples.transport.VelocityInspector._
+ override def onNext(msg: Message): Unit = {
+ msg.msg match {
+ case passRecord: PassRecord =>
+ val records = passRecords.getOrElse(passRecord.vehicleId, Queue.empty[PassRecord])
+ if (records.size > 0) {
+ val velocity = getVelocity(passRecord, records.last)
+ val formatted = "%.2f".format(velocity)
+ if (velocity > overdriveThreshold) {
+ if (velocity > fakePlateThreshold) {
+ LOG.info(s"vehicle ${passRecord.vehicleId} maybe a fake plate, " +
+ s"the speed is $formatted km/h")
+ }
+ if (queryServerActor != null) {
+ queryServerActor ! OverSpeedReport(passRecord.vehicleId, formatted,
+ passRecord.timeStamp, passRecord.locationId)
+ }
+ }
+ }
+ passRecords.update(passRecord.vehicleId, records.enqueueFinite(passRecord, RECORDS_NUM))
+ }
+ }
+
+ override def receiveUnManagedMessage: Receive = {
+ case GetTrace(vehicleId) =>
+ val records = passRecords.getOrElse(vehicleId, Queue.empty[PassRecord])
+ sender ! VehicleTrace(records.toArray.sortBy(_.timeStamp))
+ }
+
+ private def getVelocity(passRecord: PassRecord, lastPassRecord: PassRecord): Float = {
+ val distanceInKm = getDistance(lastPassRecord.locationId, passRecord.locationId)
+ val timeInHour = (passRecord.timeStamp - lastPassRecord.timeStamp).toFloat / (1000 * 60 * 60)
+ distanceInKm / timeInHour
+ }
+
+ private def getDistance(location1: String, location2: String): Long = {
+ mockCity.getDistance(location1, location2)
+ }
+}
+
+object VelocityInspector {
+ final val OVER_DRIVE_THRESHOLD = "overdrive.threshold"
+ final val FAKE_PLATE_THRESHOLD = "fakeplate.threshold"
+ final val RECORDS_NUM = 20
+
+ class FiniteQueue[T](q: Queue[T]) {
+ def enqueueFinite[B >: T](elem: B, maxSize: Int): Queue[B] = {
+ var result = q.enqueue(elem)
+ while (result.size > maxSize) {
+ result = result.dequeue._2
+ }
+ result
+ }
+ }
+
+ import scala.language.implicitConversions
+
+ implicit def queue2FiniteQueue[T](q: Queue[T]): FiniteQueue[T] = new FiniteQueue[T](q)
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/MockCity.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/MockCity.scala b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/MockCity.scala
index d2786ef..ff78679 100644
--- a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/MockCity.scala
+++ b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/MockCity.scala
@@ -1,86 +1,88 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.transport.generator
-
-import scala.util.Random
-import MockCity._
-
-class MockCity(size: Int) {
- private val random = new Random()
- private val directions = Array(UP, DOWN, LEFT, RIGHT)
-
- def nextLocation(currentLocationId: String): String = {
- val coordinate = idToCoordinate(currentLocationId)
- val direction = directions(random.nextInt(4))
- val newCoordinate = coordinate.addOffset(direction)
- if(inCity(newCoordinate)) {
- coordinateToId(newCoordinate)
- } else {
- nextLocation(currentLocationId)
- }
- }
-
- def getDistance(locationId1: String, locationId2: String): Long = {
- val coordinate1 = idToCoordinate(locationId1)
- val coordinate2 = idToCoordinate(locationId2)
- val blocks = Math.abs(coordinate1.row - coordinate2.row) + Math.abs(coordinate1.column - coordinate2.column)
- blocks * LENGTH_PER_BLOCK
- }
-
- def randomLocationId(): String = {
- val row = random.nextInt(size)
- val column = random.nextInt(size)
- coordinateToId(Coordinate(row, column))
- }
-
- private def coordinateToId(coordinate: Coordinate): String = {
- s"Id_${coordinate.row}_${coordinate.column}"
- }
-
- private def idToCoordinate(locationId: String): Coordinate = {
- val attr = locationId.split("_")
- val row = attr(1).toInt
- val column =attr(2).toInt
- Coordinate(row, column)
- }
-
- private def inCity(coordinate: Coordinate): Boolean = {
- coordinate.row >= 0 &&
- coordinate.row < size &&
- coordinate.column >= 0 &&
- coordinate.column < size
- }
-}
-
-object MockCity{
- //The length of the mock city, km
- final val LENGTH_PER_BLOCK = 5
- //The minimal speed, km/h
- final val MINIMAL_SPEED = 10
-
- final val UP = Coordinate(0, 1)
- final val DOWN = Coordinate(0, -1)
- final val LEFT = Coordinate(-1, 0)
- final val RIGHT = Coordinate(1, 0)
-
- case class Coordinate(row: Int, column: Int) {
- def addOffset(coordinate: Coordinate): Coordinate = {
- Coordinate(this.row + coordinate.row, this.column + coordinate.column)
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.gearpump.streaming.examples.transport.generator
+
+import scala.util.Random
+
+import io.gearpump.streaming.examples.transport.generator.MockCity._
+
+class MockCity(size: Int) {
+ private val random = new Random()
+ private val directions = Array(UP, DOWN, LEFT, RIGHT)
+
+ def nextLocation(currentLocationId: String): String = {
+ val coordinate = idToCoordinate(currentLocationId)
+ val direction = directions(random.nextInt(4))
+ val newCoordinate = coordinate.addOffset(direction)
+ if (inCity(newCoordinate)) {
+ coordinateToId(newCoordinate)
+ } else {
+ nextLocation(currentLocationId)
+ }
+ }
+
+ def getDistance(locationId1: String, locationId2: String): Long = {
+ val coordinate1 = idToCoordinate(locationId1)
+ val coordinate2 = idToCoordinate(locationId2)
+ val blocks = Math.abs(coordinate1.row - coordinate2.row) +
+ Math.abs(coordinate1.column - coordinate2.column)
+ blocks * LENGTH_PER_BLOCK
+ }
+
+ def randomLocationId(): String = {
+ val row = random.nextInt(size)
+ val column = random.nextInt(size)
+ coordinateToId(Coordinate(row, column))
+ }
+
+ private def coordinateToId(coordinate: Coordinate): String = {
+ s"Id_${coordinate.row}_${coordinate.column}"
+ }
+
+ private def idToCoordinate(locationId: String): Coordinate = {
+ val attr = locationId.split("_")
+ val row = attr(1).toInt
+ val column = attr(2).toInt
+ Coordinate(row, column)
+ }
+
+ private def inCity(coordinate: Coordinate): Boolean = {
+ coordinate.row >= 0 &&
+ coordinate.row < size &&
+ coordinate.column >= 0 &&
+ coordinate.column < size
+ }
+}
+
+object MockCity {
+ // The length of the mock city, km
+ final val LENGTH_PER_BLOCK = 5
+ // The minimal speed, km/h
+ final val MINIMAL_SPEED = 10
+
+ final val UP = Coordinate(0, 1)
+ final val DOWN = Coordinate(0, -1)
+ final val LEFT = Coordinate(-1, 0)
+ final val RIGHT = Coordinate(1, 0)
+
+ case class Coordinate(row: Int, column: Int) {
+ def addOffset(coordinate: Coordinate): Coordinate = {
+ Coordinate(this.row + coordinate.row, this.column + coordinate.column)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala
index 7a930d9..ee06b25 100644
--- a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala
+++ b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala
@@ -1,68 +1,69 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.transport.generator
-
-import io.gearpump.streaming.examples.transport.PassRecord
-import io.gearpump.util.LogUtil
-
-import scala.util.Random
-
-class PassRecordGenerator(vehicleId: String, city: MockCity, overdriveThreshold: Int) {
- private val LOG = LogUtil.getLogger(getClass)
- LOG.info(s"Generate pass record for vehicle $vehicleId")
- private var timeStamp = System.currentTimeMillis()
-
- private var locationId = city.randomLocationId()
- private val random = new Random()
- private val fakePlate = random.nextInt(1000) < 1000 * PassRecordGenerator.FAKE_PLATE_RATE
- private val (randomMin, randomRange) = {
- val lowerBound = MockCity.LENGTH_PER_BLOCK * 1000 * 60 * 60 / overdriveThreshold.toFloat
- val upperBound = MockCity.LENGTH_PER_BLOCK * 1000 * 60 * 60 / MockCity.MINIMAL_SPEED.toFloat
- val overdrive = (upperBound - lowerBound) * PassRecordGenerator.OVERDRIVE_RATE
- val randomMin = Math.max(lowerBound - overdrive, PassRecordGenerator.TWOMINUTES)
- val randomRange = upperBound - randomMin
- (randomMin.toInt, randomRange.toInt)
- }
-
- def getNextPassRecord(): PassRecord = {
- locationId = if(fakePlate) {
- city.randomLocationId()
- } else {
- city.nextLocation(locationId)
- }
- timeStamp += (random.nextInt(randomRange) + randomMin)
- PassRecord(vehicleId, locationId, timeStamp)
- }
-}
-
-object PassRecordGenerator {
- final val FAKE_PLATE_RATE = 0.01F
- final val OVERDRIVE_RATE = 0.05F
- final val TWOMINUTES = 2 * 60 * 1000
-
- def create(generatorNum: Int, prefix: String, city: MockCity, overdriveThreshold: Int): Array[PassRecordGenerator] = {
- var result = Map.empty[String, PassRecordGenerator]
- val digitsNum = (Math.log10(generatorNum) + 1).toInt
- for(i <- 1 to generatorNum) {
- val vehicleId = prefix + s"%0${digitsNum}d".format(i)
- val generator = new PassRecordGenerator(vehicleId, city, overdriveThreshold)
- result += vehicleId -> generator
- }
- result.values.toArray
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.gearpump.streaming.examples.transport.generator
+
+import scala.util.Random
+
+import io.gearpump.streaming.examples.transport.PassRecord
+import io.gearpump.util.LogUtil
+
+class PassRecordGenerator(vehicleId: String, city: MockCity, overdriveThreshold: Int) {
+ private val LOG = LogUtil.getLogger(getClass)
+ LOG.info(s"Generate pass record for vehicle $vehicleId")
+ private var timeStamp = System.currentTimeMillis()
+
+ private var locationId = city.randomLocationId()
+ private val random = new Random()
+ private val fakePlate = random.nextInt(1000) < 1000 * PassRecordGenerator.FAKE_PLATE_RATE
+ private val (randomMin, randomRange) = {
+ val lowerBound = MockCity.LENGTH_PER_BLOCK * 1000 * 60 * 60 / overdriveThreshold.toFloat
+ val upperBound = MockCity.LENGTH_PER_BLOCK * 1000 * 60 * 60 / MockCity.MINIMAL_SPEED.toFloat
+ val overdrive = (upperBound - lowerBound) * PassRecordGenerator.OVERDRIVE_RATE
+ val randomMin = Math.max(lowerBound - overdrive, PassRecordGenerator.TWOMINUTES)
+ val randomRange = upperBound - randomMin
+ (randomMin.toInt, randomRange.toInt)
+ }
+
+ def getNextPassRecord(): PassRecord = {
+ locationId = if (fakePlate) {
+ city.randomLocationId()
+ } else {
+ city.nextLocation(locationId)
+ }
+ timeStamp += (random.nextInt(randomRange) + randomMin)
+ PassRecord(vehicleId, locationId, timeStamp)
+ }
+}
+
+object PassRecordGenerator {
+ final val FAKE_PLATE_RATE = 0.01F
+ final val OVERDRIVE_RATE = 0.05F
+ final val TWOMINUTES = 2 * 60 * 1000
+
+ def create(generatorNum: Int, prefix: String, city: MockCity, overdriveThreshold: Int)
+ : Array[PassRecordGenerator] = {
+ var result = Map.empty[String, PassRecordGenerator]
+ val digitsNum = (Math.log10(generatorNum) + 1).toInt
+ for (i <- 1 to generatorNum) {
+ val vehicleId = prefix + s"%0${digitsNum}d".format(i)
+ val generator = new PassRecordGenerator(vehicleId, city, overdriveThreshold)
+ result += vehicleId -> generator
+ }
+ result.values.toArray
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/DataSourceSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/DataSourceSpec.scala b/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/DataSourceSpec.scala
index adf44c0..75f9d60 100644
--- a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/DataSourceSpec.scala
+++ b/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/DataSourceSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,15 +17,14 @@
*/
package io.gearpump.streaming.examples.transport
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.examples.transport.{VelocityInspector, DataSource}
-import io.gearpump.streaming.task.StartTime
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.examples.transport.VelocityInspector
import org.mockito.Matchers._
import org.mockito.Mockito._
-import org.scalatest.{Matchers, FlatSpec}
+import org.scalatest.{FlatSpec, Matchers}
+
+import io.gearpump.Message
+import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.MockUtil
+import io.gearpump.streaming.task.StartTime
class DataSourceSpec extends FlatSpec with Matchers {
it should "create the pass record" in {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/TransportSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/TransportSpec.scala b/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/TransportSpec.scala
index 94b89e1..b61fd43 100644
--- a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/TransportSpec.scala
+++ b/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/TransportSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,19 +17,19 @@
*/
package io.gearpump.streaming.examples.transport
-import io.gearpump.streaming.examples.transport.Transport
-import io.gearpump.cluster.ClientToMaster.SubmitApplication
-import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
-import io.gearpump.cluster.{TestUtil, MasterHarness}
-import io.gearpump.util.Util
-import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, Matchers, PropSpec}
+import scala.concurrent.Future
+import scala.util.Success
+
import org.scalatest.prop.PropertyChecks
+import org.scalatest.{BeforeAndAfterAll, Matchers, PropSpec}
-import scala.util.Success
+import io.gearpump.cluster.ClientToMaster.SubmitApplication
+import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
+import io.gearpump.cluster.{MasterHarness, TestUtil}
-import scala.concurrent.Future
+class TransportSpec
+ extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll with MasterHarness {
-class TransportSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll with MasterHarness {
override def beforeAll {
startActorSystem()
}
@@ -38,9 +38,9 @@ class TransportSpec extends PropSpec with PropertyChecks with Matchers with Befo
shutdownActorSystem()
}
- override def config = TestUtil.DEFAULT_CONFIG
+ protected override def config = TestUtil.DEFAULT_CONFIG
- property("Transport should succeed to submit application with required arguments"){
+ property("Transport should succeed to submit application with required arguments") {
val requiredArgs = Array.empty[String]
val optionalArgs = Array(
"-source", "1",
@@ -59,10 +59,11 @@ class TransportSpec extends PropSpec with PropertyChecks with Matchers with Befo
forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) =>
val args = requiredArgs ++ optionalArgs
- Future {Transport.main(masterConfig, args)}
+ Future {
+ Transport.main(masterConfig, args)
+ }
masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
masterReceiver.reply(SubmitApplicationResult(Success(0)))
}
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/MockCitySpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/MockCitySpec.scala b/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/MockCitySpec.scala
index 2f1495d..e91d91c 100644
--- a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/MockCitySpec.scala
+++ b/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/MockCitySpec.scala
@@ -1,32 +1,31 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.transport.generator
-
-import io.gearpump.streaming.examples.transport.generator.MockCity
-import org.scalatest.{Matchers, PropSpec}
-import org.scalatest.prop.PropertyChecks
-
-class MockCitySpec extends PropSpec with PropertyChecks with Matchers{
-
- property("MockCity should maintain the location properly") {
- val city = new MockCity(10)
- val start = city.randomLocationId()
- val nextLocation = city.nextLocation(start)
- assert(city.getDistance(start, nextLocation) == MockCity.LENGTH_PER_BLOCK)
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.gearpump.streaming.examples.transport.generator
+
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+class MockCitySpec extends PropSpec with PropertyChecks with Matchers {
+
+ property("MockCity should maintain the location properly") {
+ val city = new MockCity(10)
+ val start = city.randomLocationId()
+ val nextLocation = city.nextLocation(start)
+ assert(city.getDistance(start, nextLocation) == MockCity.LENGTH_PER_BLOCK)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala b/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala
index d1ce32d..1c1901e 100644
--- a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala
+++ b/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,19 +17,18 @@
*/
package io.gearpump.streaming.examples.transport.generator
-import io.gearpump.streaming.examples.transport.generator.{MockCity, PassRecordGenerator}
-import io.gearpump.streaming.examples.transport.generator.MockCity
-import org.scalatest.{Matchers, PropSpec}
import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
-class PassRecordGeneratorSpec extends PropSpec with PropertyChecks with Matchers{
+class PassRecordGeneratorSpec extends PropSpec with PropertyChecks with Matchers {
- property("PassRecordGenerator should generate pass record"){
+ property("PassRecordGenerator should generate pass record") {
val id = "test"
val city = new MockCity(10)
val generator = new PassRecordGenerator(id, city, 60)
val passrecord1 = generator.getNextPassRecord()
val passrecord2 = generator.getNextPassRecord()
- assert(city.getDistance(passrecord1.locationId, passrecord2.locationId) == MockCity.LENGTH_PER_BLOCK)
+ assert(city.getDistance(passrecord1.locationId, passrecord2.locationId) ==
+ MockCity.LENGTH_PER_BLOCK)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Split.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Split.java b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Split.java
index 64a0f8b..720e179 100644
--- a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Split.java
+++ b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Split.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Sum.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Sum.java b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Sum.java
index bb45651..28cf8cb 100644
--- a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Sum.java
+++ b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Sum.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -27,7 +27,7 @@ import org.slf4j.Logger;
import java.util.HashMap;
-public class Sum extends Task {
+public class Sum extends Task {
private Logger LOG = super.LOG();
private HashMap<String, Integer> wordCount = new HashMap<String, Integer>();
@@ -43,7 +43,7 @@ public class Sum extends Task {
@Override
public void onNext(Message messagePayLoad) {
- String word = (String)(messagePayLoad.msg());
+ String word = (String) (messagePayLoad.msg());
Integer current = wordCount.get(word);
if (current == null) {
current = 0;
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/WordCount.java b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/WordCount.java
index 9e1e7d5..40054d3 100644
--- a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/WordCount.java
+++ b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/WordCount.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -29,6 +29,7 @@ import io.gearpump.streaming.javaapi.Graph;
import io.gearpump.streaming.javaapi.Processor;
import io.gearpump.streaming.javaapi.StreamApplication;
+/** Java version of WordCount with Processor Graph API */
public class WordCount {
public static void main(String[] args) throws InterruptedException {
@@ -53,11 +54,9 @@ public class WordCount {
Partitioner partitioner = new HashPartitioner();
graph.addEdge(split, partitioner, sum);
-
UserConfig conf = UserConfig.empty();
StreamApplication app = new StreamApplication("wordcountJava", conf, graph);
-
EmbeddedCluster localCluster = null;
Boolean debugMode = System.getProperty("DEBUG") != null;
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
index 6857017..3aefd7f 100644
--- a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
+++ b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -34,6 +34,7 @@ import scala.Tuple2;
import java.util.Iterator;
import java.util.List;
+/** Java version of WordCount with high level DSL API */
public class WordCount {
public static void main(String[] args) throws InterruptedException {
@@ -61,14 +62,14 @@ public class WordCount {
}
}, "map");
- JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(new GroupByFunction<Tuple2<String,Integer>, String>() {
+ JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(new GroupByFunction<Tuple2<String, Integer>, String>() {
@Override
public String apply(Tuple2<String, Integer> tuple) {
return tuple._1();
}
}, 1, "groupBy");
- JavaStream<Tuple2<String, Integer>> wordcount =groupedOnes.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
+ JavaStream<Tuple2<String, Integer>> wordcount = groupedOnes.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> apply(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) {
return new Tuple2<String, Integer>(t1._1(), t1._2() + t2._2());
@@ -80,7 +81,4 @@ public class WordCount {
app.run();
context.close();
}
-
-
-
}