You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:39 UTC
[29/49] incubator-gearpump git commit: fix GEARPUMP-118 change
package name to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala
deleted file mode 100644
index ee06b25..0000000
--- a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.transport.generator
-
-import scala.util.Random
-
-import io.gearpump.streaming.examples.transport.PassRecord
-import io.gearpump.util.LogUtil
-
-class PassRecordGenerator(vehicleId: String, city: MockCity, overdriveThreshold: Int) {
- private val LOG = LogUtil.getLogger(getClass)
- LOG.info(s"Generate pass record for vehicle $vehicleId")
- private var timeStamp = System.currentTimeMillis()
-
- private var locationId = city.randomLocationId()
- private val random = new Random()
- private val fakePlate = random.nextInt(1000) < 1000 * PassRecordGenerator.FAKE_PLATE_RATE
- private val (randomMin, randomRange) = {
- val lowerBound = MockCity.LENGTH_PER_BLOCK * 1000 * 60 * 60 / overdriveThreshold.toFloat
- val upperBound = MockCity.LENGTH_PER_BLOCK * 1000 * 60 * 60 / MockCity.MINIMAL_SPEED.toFloat
- val overdrive = (upperBound - lowerBound) * PassRecordGenerator.OVERDRIVE_RATE
- val randomMin = Math.max(lowerBound - overdrive, PassRecordGenerator.TWOMINUTES)
- val randomRange = upperBound - randomMin
- (randomMin.toInt, randomRange.toInt)
- }
-
- def getNextPassRecord(): PassRecord = {
- locationId = if (fakePlate) {
- city.randomLocationId()
- } else {
- city.nextLocation(locationId)
- }
- timeStamp += (random.nextInt(randomRange) + randomMin)
- PassRecord(vehicleId, locationId, timeStamp)
- }
-}
-
-object PassRecordGenerator {
- final val FAKE_PLATE_RATE = 0.01F
- final val OVERDRIVE_RATE = 0.05F
- final val TWOMINUTES = 2 * 60 * 1000
-
- def create(generatorNum: Int, prefix: String, city: MockCity, overdriveThreshold: Int)
- : Array[PassRecordGenerator] = {
- var result = Map.empty[String, PassRecordGenerator]
- val digitsNum = (Math.log10(generatorNum) + 1).toInt
- for (i <- 1 to generatorNum) {
- val vehicleId = prefix + s"%0${digitsNum}d".format(i)
- val generator = new PassRecordGenerator(vehicleId, city, overdriveThreshold)
- result += vehicleId -> generator
- }
- result.values.toArray
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Data.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Data.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Data.scala
new file mode 100644
index 0000000..0aaf72c
--- /dev/null
+++ b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Data.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.transport
+
+case class LocationInfo(id: String, row: Int, column: Int)
+
+// scalastyle:off equals.hash.code
+case class PassRecord(vehicleId: String, locationId: String, timeStamp: Long) {
+ override def hashCode: Int = vehicleId.hashCode
+}
+// scalastyle:on equals.hash.code
+
+case class GetTrace(vehicleId: String)
+
+case class VehicleTrace(records: Array[PassRecord])
+
+case class OverSpeedReport(vehicleId: String, speed: String, timestamp: Long, locationId: String)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/DataSource.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/DataSource.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/DataSource.scala
new file mode 100644
index 0000000..555e850
--- /dev/null
+++ b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/DataSource.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.transport
+
+import scala.concurrent.duration._
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.examples.transport.generator.{MockCity, PassRecordGenerator}
+import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
+
+class DataSource(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
+ import taskContext.{output, parallelism, scheduleOnce, taskId}
+ private val overdriveThreshold = conf.getInt(VelocityInspector.OVER_DRIVE_THRESHOLD).get
+ private val vehicleNum = conf.getInt(DataSource.VEHICLE_NUM).get / parallelism
+ private val citySize = conf.getInt(DataSource.MOCK_CITY_SIZE).get
+ private val mockCity = new MockCity(citySize)
+ private val recordGenerators: Array[PassRecordGenerator] =
+ PassRecordGenerator.create(vehicleNum, getIdentifier(taskId), mockCity, overdriveThreshold)
+
+ override def onStart(startTime: StartTime): Unit = {
+ self ! Message("start", System.currentTimeMillis())
+ }
+
+ override def onNext(msg: Message): Unit = {
+ recordGenerators.foreach(generator =>
+ output(Message(generator.getNextPassRecord(), System.currentTimeMillis())))
+ scheduleOnce(1.second)(self ! Message("continue", System.currentTimeMillis()))
+ }
+
+ private def getIdentifier(taskId: TaskId): String = {
+ // scalastyle:off non.ascii.character.disallowed
+ s"\u6caaA${taskId.processorId}${taskId.index}"
+ // scalastyle:on non.ascii.character.disallowed
+ }
+}
+
+object DataSource {
+ final val VEHICLE_NUM = "vehicle.number"
+ final val MOCK_CITY_SIZE = "mock.city.size"
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/QueryServer.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/QueryServer.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/QueryServer.scala
new file mode 100644
index 0000000..ff3b4b4
--- /dev/null
+++ b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/QueryServer.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.transport
+
+import java.util.concurrent.TimeUnit
+import scala.concurrent.Future
+import scala.util.{Failure, Success}
+
+import akka.actor.Actor._
+import akka.actor.{Actor, ActorRefFactory, Props}
+import akka.io.IO
+import akka.pattern.ask
+import spray.can.Http
+import spray.http.StatusCodes
+import spray.json._
+import spray.routing.{HttpService, Route}
+import upickle.default.write
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.partitioner.PartitionerDescription
+import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef}
+import org.apache.gearpump.streaming.examples.transport.QueryServer.{GetAllRecords, WebServer}
+import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
+import org.apache.gearpump.streaming.{DAG, ProcessorDescription, ProcessorId, StreamApplication}
+import org.apache.gearpump.util.Graph
+
+class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
+ import system.dispatcher
+ import taskContext.appMaster
+
+ var inspector: (ProcessorId, ProcessorDescription) = null
+ implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
+ private var overSpeedRecords = List.empty[OverSpeedReport]
+
+ override def onStart(startTime: StartTime): Unit = {
+ val dag = DAG(conf.getValue[Graph[ProcessorDescription, PartitionerDescription]](
+ StreamApplication.DAG).get)
+ inspector = dag.processors.find { kv =>
+ val (_, processor) = kv
+ processor.taskClass == classOf[VelocityInspector].getName
+ }.get
+ taskContext.actorOf(Props(new WebServer))
+ }
+
+ override def onNext(msg: Message): Unit = {
+ }
+
+ override def receiveUnManagedMessage: Receive = {
+ case getTrace@GetTrace(vehicleId: String) =>
+ val parallism = inspector._2.parallelism
+ val processorId = inspector._1
+ val analyzerTaskId = TaskId(processorId, (vehicleId.hashCode & Integer.MAX_VALUE) % parallism)
+ val requester = sender
+ (appMaster ? LookupTaskActorRef(analyzerTaskId))
+ .asInstanceOf[Future[TaskActorRef]].flatMap { task =>
+ (task.task ? getTrace).asInstanceOf[Future[VehicleTrace]]
+ }.map { trace =>
+ LOG.info(s"reporting $trace")
+ requester ! trace
+ }
+ case record@OverSpeedReport(vehicleId, speed, timestamp, locationId) =>
+ LOG.info(s"vehicle $vehicleId is over speed, the speed is $speed km/h")
+ overSpeedRecords :+= record
+ case GetAllRecords =>
+ sender ! QueryServer.OverSpeedRecords(overSpeedRecords.toArray.sortBy(_.timestamp))
+ overSpeedRecords = List.empty[OverSpeedReport]
+ case _ =>
+ // Ignore
+ }
+}
+
+object QueryServer {
+ object GetAllRecords
+
+ case class OverSpeedRecords(records: Array[OverSpeedReport])
+
+ class WebServer extends Actor with HttpService {
+
+ import context.dispatcher
+ implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
+ def actorRefFactory: ActorRefFactory = context
+ implicit val system = context.system
+
+ IO(Http) ! Http.Bind(self, interface = "0.0.0.0", port = 8080)
+
+ override def receive: Receive = runRoute(webServer ~ staticRoute)
+
+ def webServer: Route = {
+ path("trace" / Segment) { vehicleId =>
+ get {
+ onComplete((context.parent ? GetTrace(vehicleId)).asInstanceOf[Future[VehicleTrace]]) {
+ case Success(trace: VehicleTrace) =>
+ val json = write(trace)
+ complete(pretty(json))
+ case Failure(ex) => complete(StatusCodes.InternalServerError,
+ s"An error occurred: ${ex.getMessage}")
+ }
+ }
+ } ~
+ path("records") {
+ get {
+ onComplete((context.parent ? GetAllRecords).asInstanceOf[Future[OverSpeedRecords]]) {
+ case Success(records: OverSpeedRecords) =>
+ val json = write(records)
+ complete(pretty(json))
+ case Failure(ex) => complete(StatusCodes.InternalServerError,
+ s"An error occurred: ${ex.getMessage}")
+ }
+ }
+ }
+ }
+
+ val staticRoute = {
+ pathEndOrSingleSlash {
+ getFromResource("transport/transport.html")
+ } ~
+ pathPrefix("css") {
+ get {
+ getFromResourceDirectory("transport/css")
+ }
+ } ~
+ pathPrefix("svg") {
+ get {
+ getFromResourceDirectory("transport/svg")
+ }
+ } ~
+ pathPrefix("js") {
+ get {
+ getFromResourceDirectory("transport/js")
+ }
+ }
+ }
+
+ private def pretty(json: String): String = {
+ json.parseJson.prettyPrint
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Transport.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Transport.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Transport.scala
new file mode 100644
index 0000000..5beb2e1
--- /dev/null
+++ b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Transport.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.transport
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
+import org.apache.gearpump.partitioner.HashPartitioner
+import org.apache.gearpump.streaming.{Processor, StreamApplication}
+import org.apache.gearpump.util.Graph._
+import org.apache.gearpump.util.{AkkaApp, Graph}
+
+/** A city smart transportation streaming application */
+object Transport extends AkkaApp with ArgumentsParser {
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ "source" -> CLIOption[Int]("<how many task to generate data>", required = false,
+ defaultValue = Some(10)),
+ "inspector" -> CLIOption[Int]("<how many over speed inspector>", required = false,
+ defaultValue = Some(4)),
+ "vehicle" -> CLIOption[Int]("<how many vehicles's to generate>", required = false,
+ defaultValue = Some(1000)),
+ "citysize" -> CLIOption[Int]("<the blocks number of the mock city>", required = false,
+ defaultValue = Some(10)),
+ "threshold" -> CLIOption[Int]("<overdrive threshold, km/h>", required = false,
+ defaultValue = Some(60)))
+
+ def application(config: ParseResult): StreamApplication = {
+ val sourceNum = config.getInt("source")
+ val inspectorNum = config.getInt("inspector")
+ val vehicleNum = config.getInt("vehicle")
+ val citysize = config.getInt("citysize")
+ val threshold = config.getInt("threshold")
+ val source = Processor[DataSource](sourceNum)
+ val inspector = Processor[VelocityInspector](inspectorNum)
+ val queryServer = Processor[QueryServer](1)
+ val partitioner = new HashPartitioner
+
+ val userConfig = UserConfig.empty.withInt(DataSource.VEHICLE_NUM, vehicleNum).
+ withInt(DataSource.MOCK_CITY_SIZE, citysize).
+ withInt(VelocityInspector.OVER_DRIVE_THRESHOLD, threshold).
+ withInt(VelocityInspector.FAKE_PLATE_THRESHOLD, 200)
+ StreamApplication("transport", Graph(source ~ partitioner ~> inspector,
+ Node(queryServer)), userConfig)
+ }
+
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ val config = parse(args)
+ val context = ClientContext(akkaConf)
+ implicit val system = context.system
+ context.submit(application(config))
+ context.close()
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/VelocityInspector.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/VelocityInspector.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/VelocityInspector.scala
new file mode 100644
index 0000000..4d9bd04
--- /dev/null
+++ b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/VelocityInspector.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.transport
+
+import java.util.concurrent.TimeUnit
+import scala.collection.immutable.Queue
+import scala.collection.mutable
+import scala.concurrent.Future
+
+import akka.actor.Actor._
+import akka.actor.ActorRef
+import akka.pattern.ask
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.partitioner.PartitionerDescription
+import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef}
+import org.apache.gearpump.streaming.examples.transport.generator.MockCity
+import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
+import org.apache.gearpump.streaming.{DAG, ProcessorDescription, StreamApplication}
+import org.apache.gearpump.util.Graph
+
+class VelocityInspector(taskContext: TaskContext, conf: UserConfig)
+ extends Task(taskContext, conf) {
+
+ import system.dispatcher
+ import taskContext.appMaster
+ implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
+ private val passRecords = mutable.Map.empty[String, Queue[PassRecord]]
+ private val fakePlateThreshold = conf.getInt(VelocityInspector.FAKE_PLATE_THRESHOLD).get
+ private val overdriveThreshold = conf.getInt(VelocityInspector.OVER_DRIVE_THRESHOLD).get
+ private val citySize = conf.getInt(DataSource.MOCK_CITY_SIZE).get
+ private val mockCity = new MockCity(citySize)
+ private var queryServerActor: ActorRef = null
+
+ override def onStart(startTime: StartTime): Unit = {
+ val dag = DAG(conf.getValue[Graph[ProcessorDescription, PartitionerDescription]](
+ StreamApplication.DAG).get)
+ val queryServer = dag.processors.find { kv =>
+ val (_, processor) = kv
+ processor.taskClass == classOf[QueryServer].getName
+ }.get
+ val queryServerTaskId = TaskId(queryServer._1, 0)
+ (appMaster ? LookupTaskActorRef(queryServerTaskId)).asInstanceOf[Future[TaskActorRef]]
+ .map { task =>
+ queryServerActor = task.task
+ }
+ }
+
+ import org.apache.gearpump.streaming.examples.transport.VelocityInspector._
+ override def onNext(msg: Message): Unit = {
+ msg.msg match {
+ case passRecord: PassRecord =>
+ val records = passRecords.getOrElse(passRecord.vehicleId, Queue.empty[PassRecord])
+ if (records.size > 0) {
+ val velocity = getVelocity(passRecord, records.last)
+ val formatted = "%.2f".format(velocity)
+ if (velocity > overdriveThreshold) {
+ if (velocity > fakePlateThreshold) {
+ LOG.info(s"vehicle ${passRecord.vehicleId} maybe a fake plate, " +
+ s"the speed is $formatted km/h")
+ }
+ if (queryServerActor != null) {
+ queryServerActor ! OverSpeedReport(passRecord.vehicleId, formatted,
+ passRecord.timeStamp, passRecord.locationId)
+ }
+ }
+ }
+ passRecords.update(passRecord.vehicleId, records.enqueueFinite(passRecord, RECORDS_NUM))
+ }
+ }
+
+ override def receiveUnManagedMessage: Receive = {
+ case GetTrace(vehicleId) =>
+ val records = passRecords.getOrElse(vehicleId, Queue.empty[PassRecord])
+ sender ! VehicleTrace(records.toArray.sortBy(_.timeStamp))
+ }
+
+ private def getVelocity(passRecord: PassRecord, lastPassRecord: PassRecord): Float = {
+ val distanceInKm = getDistance(lastPassRecord.locationId, passRecord.locationId)
+ val timeInHour = (passRecord.timeStamp - lastPassRecord.timeStamp).toFloat / (1000 * 60 * 60)
+ distanceInKm / timeInHour
+ }
+
+ private def getDistance(location1: String, location2: String): Long = {
+ mockCity.getDistance(location1, location2)
+ }
+}
+
+object VelocityInspector {
+ final val OVER_DRIVE_THRESHOLD = "overdrive.threshold"
+ final val FAKE_PLATE_THRESHOLD = "fakeplate.threshold"
+ final val RECORDS_NUM = 20
+
+ class FiniteQueue[T](q: Queue[T]) {
+ def enqueueFinite[B >: T](elem: B, maxSize: Int): Queue[B] = {
+ var result = q.enqueue(elem)
+ while (result.size > maxSize) {
+ result = result.dequeue._2
+ }
+ result
+ }
+ }
+
+ import scala.language.implicitConversions
+
+ implicit def queue2FiniteQueue[T](q: Queue[T]): FiniteQueue[T] = new FiniteQueue[T](q)
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCity.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCity.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCity.scala
new file mode 100644
index 0000000..60e0bcf
--- /dev/null
+++ b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCity.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.transport.generator
+
+import scala.util.Random
+
+import org.apache.gearpump.streaming.examples.transport.generator.MockCity._
+
+class MockCity(size: Int) {
+ private val random = new Random()
+ private val directions = Array(UP, DOWN, LEFT, RIGHT)
+
+ def nextLocation(currentLocationId: String): String = {
+ val coordinate = idToCoordinate(currentLocationId)
+ val direction = directions(random.nextInt(4))
+ val newCoordinate = coordinate.addOffset(direction)
+ if (inCity(newCoordinate)) {
+ coordinateToId(newCoordinate)
+ } else {
+ nextLocation(currentLocationId)
+ }
+ }
+
+ def getDistance(locationId1: String, locationId2: String): Long = {
+ val coordinate1 = idToCoordinate(locationId1)
+ val coordinate2 = idToCoordinate(locationId2)
+ val blocks = Math.abs(coordinate1.row - coordinate2.row) +
+ Math.abs(coordinate1.column - coordinate2.column)
+ blocks * LENGTH_PER_BLOCK
+ }
+
+ def randomLocationId(): String = {
+ val row = random.nextInt(size)
+ val column = random.nextInt(size)
+ coordinateToId(Coordinate(row, column))
+ }
+
+ private def coordinateToId(coordinate: Coordinate): String = {
+ s"Id_${coordinate.row}_${coordinate.column}"
+ }
+
+ private def idToCoordinate(locationId: String): Coordinate = {
+ val attr = locationId.split("_")
+ val row = attr(1).toInt
+ val column = attr(2).toInt
+ Coordinate(row, column)
+ }
+
+ private def inCity(coordinate: Coordinate): Boolean = {
+ coordinate.row >= 0 &&
+ coordinate.row < size &&
+ coordinate.column >= 0 &&
+ coordinate.column < size
+ }
+}
+
+object MockCity {
+ // The length of the mock city, km
+ final val LENGTH_PER_BLOCK = 5
+ // The minimal speed, km/h
+ final val MINIMAL_SPEED = 10
+
+ final val UP = Coordinate(0, 1)
+ final val DOWN = Coordinate(0, -1)
+ final val LEFT = Coordinate(-1, 0)
+ final val RIGHT = Coordinate(1, 0)
+
+ case class Coordinate(row: Int, column: Int) {
+ def addOffset(coordinate: Coordinate): Coordinate = {
+ Coordinate(this.row + coordinate.row, this.column + coordinate.column)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala
new file mode 100644
index 0000000..e8c1c59
--- /dev/null
+++ b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.transport.generator
+
+import scala.util.Random
+
+import org.apache.gearpump.streaming.examples.transport.PassRecord
+import org.apache.gearpump.util.LogUtil
+
+class PassRecordGenerator(vehicleId: String, city: MockCity, overdriveThreshold: Int) {
+ private val LOG = LogUtil.getLogger(getClass)
+ LOG.info(s"Generate pass record for vehicle $vehicleId")
+ private var timeStamp = System.currentTimeMillis()
+
+ private var locationId = city.randomLocationId()
+ private val random = new Random()
+ private val fakePlate = random.nextInt(1000) < 1000 * PassRecordGenerator.FAKE_PLATE_RATE
+ private val (randomMin, randomRange) = {
+ val lowerBound = MockCity.LENGTH_PER_BLOCK * 1000 * 60 * 60 / overdriveThreshold.toFloat
+ val upperBound = MockCity.LENGTH_PER_BLOCK * 1000 * 60 * 60 / MockCity.MINIMAL_SPEED.toFloat
+ val overdrive = (upperBound - lowerBound) * PassRecordGenerator.OVERDRIVE_RATE
+ val randomMin = Math.max(lowerBound - overdrive, PassRecordGenerator.TWOMINUTES)
+ val randomRange = upperBound - randomMin
+ (randomMin.toInt, randomRange.toInt)
+ }
+
+ def getNextPassRecord(): PassRecord = {
+ locationId = if (fakePlate) {
+ city.randomLocationId()
+ } else {
+ city.nextLocation(locationId)
+ }
+ timeStamp += (random.nextInt(randomRange) + randomMin)
+ PassRecord(vehicleId, locationId, timeStamp)
+ }
+}
+
+object PassRecordGenerator {
+ final val FAKE_PLATE_RATE = 0.01F
+ final val OVERDRIVE_RATE = 0.05F
+ final val TWOMINUTES = 2 * 60 * 1000
+
+ def create(generatorNum: Int, prefix: String, city: MockCity, overdriveThreshold: Int)
+ : Array[PassRecordGenerator] = {
+ var result = Map.empty[String, PassRecordGenerator]
+ val digitsNum = (Math.log10(generatorNum) + 1).toInt
+ for (i <- 1 to generatorNum) {
+ val vehicleId = prefix + s"%0${digitsNum}d".format(i)
+ val generator = new PassRecordGenerator(vehicleId, city, overdriveThreshold)
+ result += vehicleId -> generator
+ }
+ result.values.toArray
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/DataSourceSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/DataSourceSpec.scala b/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/DataSourceSpec.scala
deleted file mode 100644
index 75f9d60..0000000
--- a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/DataSourceSpec.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.transport
-
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalatest.{FlatSpec, Matchers}
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.task.StartTime
-
-class DataSourceSpec extends FlatSpec with Matchers {
- it should "create the pass record" in {
- val vehicleNum = 2
- val context = MockUtil.mockTaskContext
-
- val userConfig = UserConfig.empty.withInt(DataSource.VEHICLE_NUM, vehicleNum).
- withInt(DataSource.MOCK_CITY_SIZE, 10).
- withInt(VelocityInspector.OVER_DRIVE_THRESHOLD, 60).
- withInt(VelocityInspector.FAKE_PLATE_THRESHOLD, 200)
-
- val source = new DataSource(context, userConfig)
- source.onStart(StartTime(0))
- source.onNext(Message("start"))
- verify(context, times(vehicleNum)).output(any[Message])
- source.onStop()
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/TransportSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/TransportSpec.scala b/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/TransportSpec.scala
deleted file mode 100644
index b61fd43..0000000
--- a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/TransportSpec.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.transport
-
-import scala.concurrent.Future
-import scala.util.Success
-
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{BeforeAndAfterAll, Matchers, PropSpec}
-
-import io.gearpump.cluster.ClientToMaster.SubmitApplication
-import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
-import io.gearpump.cluster.{MasterHarness, TestUtil}
-
-class TransportSpec
- extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll with MasterHarness {
-
- override def beforeAll {
- startActorSystem()
- }
-
- override def afterAll {
- shutdownActorSystem()
- }
-
- protected override def config = TestUtil.DEFAULT_CONFIG
-
- property("Transport should succeed to submit application with required arguments") {
- val requiredArgs = Array.empty[String]
- val optionalArgs = Array(
- "-source", "1",
- "-inspector", "1",
- "-vehicle", "100",
- "-citysize", "10",
- "-threshold", "60")
-
- val args = {
- Table(
- ("requiredArgs", "optionalArgs"),
- (requiredArgs, optionalArgs)
- )
- }
- val masterReceiver = createMockMaster()
- forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) =>
- val args = requiredArgs ++ optionalArgs
-
- Future {
- Transport.main(masterConfig, args)
- }
- masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
- masterReceiver.reply(SubmitApplicationResult(Success(0)))
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/MockCitySpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/MockCitySpec.scala b/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/MockCitySpec.scala
deleted file mode 100644
index e91d91c..0000000
--- a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/MockCitySpec.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.transport.generator
-
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-class MockCitySpec extends PropSpec with PropertyChecks with Matchers {
-
- property("MockCity should maintain the location properly") {
- val city = new MockCity(10)
- val start = city.randomLocationId()
- val nextLocation = city.nextLocation(start)
- assert(city.getDistance(start, nextLocation) == MockCity.LENGTH_PER_BLOCK)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala b/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala
deleted file mode 100644
index 1c1901e..0000000
--- a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.transport.generator
-
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-class PassRecordGeneratorSpec extends PropSpec with PropertyChecks with Matchers {
-
- property("PassRecordGenerator should generate pass record") {
- val id = "test"
- val city = new MockCity(10)
- val generator = new PassRecordGenerator(id, city, 60)
- val passrecord1 = generator.getNextPassRecord()
- val passrecord2 = generator.getNextPassRecord()
- assert(city.getDistance(passrecord1.locationId, passrecord2.locationId) ==
- MockCity.LENGTH_PER_BLOCK)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/DataSourceSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/DataSourceSpec.scala b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/DataSourceSpec.scala
new file mode 100644
index 0000000..1f525ae
--- /dev/null
+++ b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/DataSourceSpec.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.transport
+
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.{FlatSpec, Matchers}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.task.StartTime
+
+class DataSourceSpec extends FlatSpec with Matchers {
+ it should "create the pass record" in {
+ val vehicleNum = 2
+ val context = MockUtil.mockTaskContext
+
+ val userConfig = UserConfig.empty.withInt(DataSource.VEHICLE_NUM, vehicleNum).
+ withInt(DataSource.MOCK_CITY_SIZE, 10).
+ withInt(VelocityInspector.OVER_DRIVE_THRESHOLD, 60).
+ withInt(VelocityInspector.FAKE_PLATE_THRESHOLD, 200)
+
+ val source = new DataSource(context, userConfig)
+ source.onStart(StartTime(0))
+ source.onNext(Message("start"))
+ verify(context, times(vehicleNum)).output(any[Message])
+ source.onStop()
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/TransportSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/TransportSpec.scala b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/TransportSpec.scala
new file mode 100644
index 0000000..2f83de5
--- /dev/null
+++ b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/TransportSpec.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.transport
+
+import scala.concurrent.Future
+import scala.util.Success
+
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{BeforeAndAfterAll, Matchers, PropSpec}
+
+import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication
+import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult
+import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
+
+class TransportSpec
+ extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll with MasterHarness {
+
+ override def beforeAll {
+ startActorSystem()
+ }
+
+ override def afterAll {
+ shutdownActorSystem()
+ }
+
+ protected override def config = TestUtil.DEFAULT_CONFIG
+
+ property("Transport should succeed to submit application with required arguments") {
+ val requiredArgs = Array.empty[String]
+ val optionalArgs = Array(
+ "-source", "1",
+ "-inspector", "1",
+ "-vehicle", "100",
+ "-citysize", "10",
+ "-threshold", "60")
+
+ val args = {
+ Table(
+ ("requiredArgs", "optionalArgs"),
+ (requiredArgs, optionalArgs)
+ )
+ }
+ val masterReceiver = createMockMaster()
+ forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) =>
+ val args = requiredArgs ++ optionalArgs
+
+ Future {
+ Transport.main(masterConfig, args)
+ }
+ masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
+ masterReceiver.reply(SubmitApplicationResult(Success(0)))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCitySpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCitySpec.scala b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCitySpec.scala
new file mode 100644
index 0000000..ba4eb2d
--- /dev/null
+++ b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCitySpec.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.transport.generator
+
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+class MockCitySpec extends PropSpec with PropertyChecks with Matchers {
+
+ property("MockCity should maintain the location properly") {
+ val city = new MockCity(10)
+ val start = city.randomLocationId()
+ val nextLocation = city.nextLocation(start)
+ assert(city.getDistance(start, nextLocation) == MockCity.LENGTH_PER_BLOCK)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala
new file mode 100644
index 0000000..f0eebbf
--- /dev/null
+++ b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.transport.generator
+
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+class PassRecordGeneratorSpec extends PropSpec with PropertyChecks with Matchers {
+
+ property("PassRecordGenerator should generate pass record") {
+ val id = "test"
+ val city = new MockCity(10)
+ val generator = new PassRecordGenerator(id, city, 60)
+ val passrecord1 = generator.getNextPassRecord()
+ val passrecord2 = generator.getNextPassRecord()
+ assert(city.getDistance(passrecord1.locationId, passrecord2.locationId) ==
+ MockCity.LENGTH_PER_BLOCK)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Split.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Split.java b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Split.java
deleted file mode 100644
index 720e179..0000000
--- a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Split.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.wordcountjava;
-
-import io.gearpump.Message;
-import io.gearpump.cluster.UserConfig;
-import io.gearpump.streaming.javaapi.Task;
-import io.gearpump.streaming.task.StartTime;
-import io.gearpump.streaming.task.TaskContext;
-
-public class Split extends Task {
-
- public static String TEXT = "This is a good start for java! bingo! bingo! ";
-
- public Split(TaskContext taskContext, UserConfig userConf) {
- super(taskContext, userConf);
- }
-
- private Long now() {
- return System.currentTimeMillis();
- }
-
- @Override
- public void onStart(StartTime startTime) {
- self().tell(new Message("start", now()), self());
- }
-
- @Override
- public void onNext(Message msg) {
-
- // Split the TEXT to words
- String[] words = TEXT.split(" ");
- for (int i = 0; i < words.length; i++) {
- context.output(new Message(words[i], now()));
- }
- self().tell(new Message("next", now()), self());
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Sum.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Sum.java b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Sum.java
deleted file mode 100644
index 28cf8cb..0000000
--- a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Sum.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.wordcountjava;
-
-import io.gearpump.Message;
-import io.gearpump.cluster.UserConfig;
-import io.gearpump.streaming.javaapi.Task;
-import io.gearpump.streaming.task.StartTime;
-import io.gearpump.streaming.task.TaskContext;
-import org.slf4j.Logger;
-
-import java.util.HashMap;
-
-public class Sum extends Task {
-
- private Logger LOG = super.LOG();
- private HashMap<String, Integer> wordCount = new HashMap<String, Integer>();
-
- public Sum(TaskContext taskContext, UserConfig userConf) {
- super(taskContext, userConf);
- }
-
- @Override
- public void onStart(StartTime startTime) {
- //skip
- }
-
- @Override
- public void onNext(Message messagePayLoad) {
- String word = (String) (messagePayLoad.msg());
- Integer current = wordCount.get(word);
- if (current == null) {
- current = 0;
- }
- Integer newCount = current + 1;
- wordCount.put(word, newCount);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/WordCount.java b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/WordCount.java
deleted file mode 100644
index 40054d3..0000000
--- a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/WordCount.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.wordcountjava;
-
-import com.typesafe.config.Config;
-import io.gearpump.cluster.ClusterConfig;
-import io.gearpump.cluster.UserConfig;
-import io.gearpump.cluster.client.ClientContext;
-import io.gearpump.cluster.embedded.EmbeddedCluster;
-import io.gearpump.partitioner.HashPartitioner;
-import io.gearpump.partitioner.Partitioner;
-import io.gearpump.streaming.javaapi.Graph;
-import io.gearpump.streaming.javaapi.Processor;
-import io.gearpump.streaming.javaapi.StreamApplication;
-
-/** Java version of WordCount with Processor Graph API */
-public class WordCount {
-
- public static void main(String[] args) throws InterruptedException {
- main(ClusterConfig.defaultConfig(), args);
- }
-
- public static void main(Config akkaConf, String[] args) throws InterruptedException {
-
- // For split task, we config to create two tasks
- int splitTaskNumber = 2;
- Processor split = new Processor(Split.class).withParallelism(splitTaskNumber);
-
- // For sum task, we have two summer.
- int sumTaskNumber = 2;
- Processor sum = new Processor(Sum.class).withParallelism(sumTaskNumber);
-
- // construct the graph
- Graph graph = new Graph();
- graph.addVertex(split);
- graph.addVertex(sum);
-
- Partitioner partitioner = new HashPartitioner();
- graph.addEdge(split, partitioner, sum);
-
- UserConfig conf = UserConfig.empty();
- StreamApplication app = new StreamApplication("wordcountJava", conf, graph);
-
- EmbeddedCluster localCluster = null;
-
- Boolean debugMode = System.getProperty("DEBUG") != null;
-
- if (debugMode) {
- localCluster = new EmbeddedCluster(akkaConf);
- localCluster.start();
- }
-
- ClientContext masterClient = null;
-
- if (localCluster != null) {
- masterClient = localCluster.newClientContext();
- } else {
- // create master client
- // It will read the master settings under gearpump.cluster.masters
- masterClient = new ClientContext(akkaConf);
- }
-
- masterClient.submit(app);
-
- if (debugMode) {
- Thread.sleep(30 * 1000); // sleep for 30 seconds.
- }
-
- masterClient.close();
-
- if (localCluster != null) {
- localCluster.stop();
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
deleted file mode 100644
index 3aefd7f..0000000
--- a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.wordcountjava.dsl;
-
-import com.typesafe.config.Config;
-import io.gearpump.cluster.ClusterConfig;
-import io.gearpump.cluster.UserConfig;
-import io.gearpump.cluster.client.ClientContext;
-import io.gearpump.google.common.collect.Lists;
-import io.gearpump.streaming.dsl.javaapi.JavaStream;
-import io.gearpump.streaming.dsl.javaapi.JavaStreamApp;
-import io.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
-import io.gearpump.streaming.javaapi.dsl.functions.GroupByFunction;
-import io.gearpump.streaming.javaapi.dsl.functions.MapFunction;
-import io.gearpump.streaming.javaapi.dsl.functions.ReduceFunction;
-import scala.Tuple2;
-
-import java.util.Iterator;
-import java.util.List;
-
-/** Java version of WordCount with high level DSL API */
-public class WordCount {
-
- public static void main(String[] args) throws InterruptedException {
- main(ClusterConfig.defaultConfig(), args);
- }
-
- public static void main(Config akkaConf, String[] args) throws InterruptedException {
- ClientContext context = new ClientContext(akkaConf);
- JavaStreamApp app = new JavaStreamApp("JavaDSL", context, UserConfig.empty());
- List<String> source = Lists.newArrayList("This is a good start, bingo!! bingo!!");
-
- JavaStream<String> sentence = app.source(source, 1, UserConfig.empty(), "source");
-
- JavaStream<String> words = sentence.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterator<String> apply(String s) {
- return Lists.newArrayList(s.split("\\s+")).iterator();
- }
- }, "flatMap");
-
- JavaStream<Tuple2<String, Integer>> ones = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
- @Override
- public Tuple2<String, Integer> apply(String s) {
- return new Tuple2<String, Integer>(s, 1);
- }
- }, "map");
-
- JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(new GroupByFunction<Tuple2<String, Integer>, String>() {
- @Override
- public String apply(Tuple2<String, Integer> tuple) {
- return tuple._1();
- }
- }, 1, "groupBy");
-
- JavaStream<Tuple2<String, Integer>> wordcount = groupedOnes.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
- @Override
- public Tuple2<String, Integer> apply(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) {
- return new Tuple2<String, Integer>(t1._1(), t1._2() + t2._2());
- }
- }, "reduce");
-
- wordcount.log();
-
- app.run();
- context.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java
new file mode 100644
index 0000000..76069c1
--- /dev/null
+++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.wordcountjava;
+
+import org.apache.gearpump.Message;
+import org.apache.gearpump.cluster.UserConfig;
+import org.apache.gearpump.streaming.javaapi.Task;
+import org.apache.gearpump.streaming.task.StartTime;
+import org.apache.gearpump.streaming.task.TaskContext;
+
+public class Split extends Task {
+
+ public static String TEXT = "This is a good start for java! bingo! bingo! ";
+
+ public Split(TaskContext taskContext, UserConfig userConf) {
+ super(taskContext, userConf);
+ }
+
+ private Long now() {
+ return System.currentTimeMillis();
+ }
+
+ @Override
+ public void onStart(StartTime startTime) {
+ self().tell(new Message("start", now()), self());
+ }
+
+ @Override
+ public void onNext(Message msg) {
+
+ // Split the TEXT to words
+ String[] words = TEXT.split(" ");
+ for (int i = 0; i < words.length; i++) {
+ context.output(new Message(words[i], now()));
+ }
+ self().tell(new Message("next", now()), self());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java
new file mode 100644
index 0000000..89c3b14
--- /dev/null
+++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.wordcountjava;
+
+import org.apache.gearpump.Message;
+import org.apache.gearpump.cluster.UserConfig;
+import org.apache.gearpump.streaming.javaapi.Task;
+import org.apache.gearpump.streaming.task.StartTime;
+import org.apache.gearpump.streaming.task.TaskContext;
+import org.slf4j.Logger;
+
+import java.util.HashMap;
+
+public class Sum extends Task {
+
+ private Logger LOG = super.LOG();
+ private HashMap<String, Integer> wordCount = new HashMap<String, Integer>();
+
+ public Sum(TaskContext taskContext, UserConfig userConf) {
+ super(taskContext, userConf);
+ }
+
+ @Override
+ public void onStart(StartTime startTime) {
+ //skip
+ }
+
+ @Override
+ public void onNext(Message messagePayLoad) {
+ String word = (String) (messagePayLoad.msg());
+ Integer current = wordCount.get(word);
+ if (current == null) {
+ current = 0;
+ }
+ Integer newCount = current + 1;
+ wordCount.put(word, newCount);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java
new file mode 100644
index 0000000..ee44536
--- /dev/null
+++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.wordcountjava;
+
+import com.typesafe.config.Config;
+import org.apache.gearpump.cluster.ClusterConfig;
+import org.apache.gearpump.cluster.UserConfig;
+import org.apache.gearpump.cluster.client.ClientContext;
+import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
+import org.apache.gearpump.partitioner.HashPartitioner;
+import org.apache.gearpump.partitioner.Partitioner;
+import org.apache.gearpump.streaming.javaapi.Graph;
+import org.apache.gearpump.streaming.javaapi.Processor;
+import org.apache.gearpump.streaming.javaapi.StreamApplication;
+
+/** Java version of WordCount with Processor Graph API */
+public class WordCount {
+
+ public static void main(String[] args) throws InterruptedException {
+ main(ClusterConfig.defaultConfig(), args);
+ }
+
+ public static void main(Config akkaConf, String[] args) throws InterruptedException {
+
+ // For split task, we config to create two tasks
+ int splitTaskNumber = 2;
+ Processor split = new Processor(Split.class).withParallelism(splitTaskNumber);
+
+ // For sum task, we have two summer.
+ int sumTaskNumber = 2;
+ Processor sum = new Processor(Sum.class).withParallelism(sumTaskNumber);
+
+ // construct the graph
+ Graph graph = new Graph();
+ graph.addVertex(split);
+ graph.addVertex(sum);
+
+ Partitioner partitioner = new HashPartitioner();
+ graph.addEdge(split, partitioner, sum);
+
+ UserConfig conf = UserConfig.empty();
+ StreamApplication app = new StreamApplication("wordcountJava", conf, graph);
+
+ EmbeddedCluster localCluster = null;
+
+ Boolean debugMode = System.getProperty("DEBUG") != null;
+
+ if (debugMode) {
+ localCluster = new EmbeddedCluster(akkaConf);
+ localCluster.start();
+ }
+
+ ClientContext masterClient = null;
+
+ if (localCluster != null) {
+ masterClient = localCluster.newClientContext();
+ } else {
+ // create master client
+ // It will read the master settings under gearpump.cluster.masters
+ masterClient = new ClientContext(akkaConf);
+ }
+
+ masterClient.submit(app);
+
+ if (debugMode) {
+ Thread.sleep(30 * 1000); // sleep for 30 seconds.
+ }
+
+ masterClient.close();
+
+ if (localCluster != null) {
+ localCluster.stop();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
new file mode 100644
index 0000000..0ecc42e
--- /dev/null
+++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.wordcountjava.dsl;
+
+import com.typesafe.config.Config;
+import org.apache.gearpump.cluster.ClusterConfig;
+import org.apache.gearpump.cluster.UserConfig;
+import org.apache.gearpump.cluster.client.ClientContext;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
+import org.apache.gearpump.streaming.javaapi.dsl.functions.GroupByFunction;
+import org.apache.gearpump.streaming.javaapi.dsl.functions.MapFunction;
+import org.apache.gearpump.streaming.javaapi.dsl.functions.ReduceFunction;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+/** Java version of WordCount with high level DSL API */
+public class WordCount {
+
+ public static void main(String[] args) throws InterruptedException {
+ main(ClusterConfig.defaultConfig(), args);
+ }
+
+ public static void main(Config akkaConf, String[] args) throws InterruptedException {
+ ClientContext context = new ClientContext(akkaConf);
+ JavaStreamApp app = new JavaStreamApp("JavaDSL", context, UserConfig.empty());
+ List<String> source = new ArrayList<>(Arrays.asList("This is a good start, bingo!! bingo!!"));
+
+ JavaStream<String> sentence = app.source(source, 1, UserConfig.empty(), "source");
+
+ JavaStream<String> words = sentence.flatMap(new FlatMapFunction<String, String>() {
+ @Override
+ public Iterator<String> apply(String s) {
+ return new ArrayList<String>(Arrays.asList(s.split("\\s+"))).iterator();
+ }
+ }, "flatMap");
+
+ JavaStream<Tuple2<String, Integer>> ones = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
+ @Override
+ public Tuple2<String, Integer> apply(String s) {
+ return new Tuple2<String, Integer>(s, 1);
+ }
+ }, "map");
+
+ JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(new GroupByFunction<Tuple2<String, Integer>, String>() {
+ @Override
+ public String apply(Tuple2<String, Integer> tuple) {
+ return tuple._1();
+ }
+ }, 1, "groupBy");
+
+ JavaStream<Tuple2<String, Integer>> wordcount = groupedOnes.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
+ @Override
+ public Tuple2<String, Integer> apply(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) {
+ return new Tuple2<String, Integer>(t1._1(), t1._2() + t2._2());
+ }
+ }, "reduce");
+
+ wordcount.log();
+
+ app.run();
+ context.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount-java/src/test/scala/io/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/test/scala/io/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala b/examples/streaming/wordcount-java/src/test/scala/io/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala
deleted file mode 100644
index 1a1d019..0000000
--- a/examples/streaming/wordcount-java/src/test/scala/io/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.wordcount
-
-import scala.concurrent.Future
-import scala.util.Success
-
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
-
-import io.gearpump.cluster.ClientToMaster.SubmitApplication
-import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
-import io.gearpump.cluster.{MasterHarness, TestUtil}
-import io.gearpump.streaming.examples.wordcountjava.WordCount
-
-class WordCountSpec
- extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness {
-
- before {
- startActorSystem()
- }
-
- after {
- shutdownActorSystem()
- }
-
- protected override def config = TestUtil.DEFAULT_CONFIG
-
- property("WordCount should succeed to submit application with required arguments") {
- val requiredArgs = Array.empty[String]
-
- val masterReceiver = createMockMaster()
-
- val args = requiredArgs
-
- Future {
- WordCount.main(masterConfig, args)
- }
-
- masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
- masterReceiver.reply(SubmitApplicationResult(Success(0)))
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount-java/src/test/scala/org/apache/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/test/scala/org/apache/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala b/examples/streaming/wordcount-java/src/test/scala/org/apache/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala
new file mode 100644
index 0000000..3736c86
--- /dev/null
+++ b/examples/streaming/wordcount-java/src/test/scala/org/apache/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.wordcountjava
+
+import scala.concurrent.Future
+import scala.util.Success
+
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
+
+import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication
+import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult
+import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
+import org.apache.gearpump.streaming.examples.wordcountjava.WordCount
+
+class WordCountSpec
+ extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness {
+
+ before {
+ startActorSystem()
+ }
+
+ after {
+ shutdownActorSystem()
+ }
+
+ protected override def config = TestUtil.DEFAULT_CONFIG
+
+ property("WordCount should succeed to submit application with required arguments") {
+ val requiredArgs = Array.empty[String]
+
+ val masterReceiver = createMockMaster()
+
+ val args = requiredArgs
+
+ Future {
+ WordCount.main(masterConfig, args)
+ }
+
+ masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
+ masterReceiver.reply(SubmitApplicationResult(Success(0)))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Split.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Split.scala b/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Split.scala
deleted file mode 100644
index 387bc75..0000000
--- a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Split.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.wordcount
-
-import java.util.concurrent.TimeUnit
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
-
-class Split(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
- import taskContext.output
-
- override def onStart(startTime: StartTime): Unit = {
- self ! Message("start")
- }
-
- override def onNext(msg: Message): Unit = {
- Split.TEXT_TO_SPLIT.lines.foreach { line =>
- line.split("[\\s]+").filter(_.nonEmpty).foreach { msg =>
- output(new Message(msg, System.currentTimeMillis()))
- }
- }
-
- import scala.concurrent.duration._
- taskContext.scheduleOnce(Duration(100, TimeUnit.MILLISECONDS))(self !
- Message("continue", System.currentTimeMillis()))
- }
-}
-
-object Split {
- val TEXT_TO_SPLIT =
- """
- | Licensed to the Apache Software Foundation (ASF) under one
- | or more contributor license agreements. See the NOTICE file
- | distributed with this work for additional information
- | regarding copyright ownership. The ASF licenses this file
- | to you under the Apache License, Version 2.0 (the
- | "License"); you may not use this file except in compliance
- | with the License. You may obtain a copy of the License at
- |
- | http://www.apache.org/licenses/LICENSE-2.0
- |
- | Unless required by applicable law or agreed to in writing, software
- | distributed under the License is distributed on an "AS IS" BASIS,
- | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- | See the License for the specific language governing permissions and
- | limitations under the License.
- """.stripMargin
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Sum.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Sum.scala b/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Sum.scala
deleted file mode 100644
index 6560066..0000000
--- a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Sum.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.wordcount
-
-import java.util.concurrent.TimeUnit
-import scala.collection.mutable
-import scala.concurrent.duration.FiniteDuration
-
-import akka.actor.Cancellable
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
-
-class Sum(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
- private[wordcount] val map: mutable.HashMap[String, Long] = new mutable.HashMap[String, Long]()
-
- private[wordcount] var wordCount: Long = 0
- private var snapShotTime: Long = System.currentTimeMillis()
- private var snapShotWordCount: Long = 0
-
- private var scheduler: Cancellable = null
-
- override def onStart(startTime: StartTime): Unit = {
- scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS),
- new FiniteDuration(30, TimeUnit.SECONDS))(reportWordCount)
- }
-
- override def onNext(msg: Message): Unit = {
- if (null != msg) {
- val current = map.getOrElse(msg.msg.asInstanceOf[String], 0L)
- wordCount += 1
- map.put(msg.msg.asInstanceOf[String], current + 1)
- }
- }
-
- override def onStop(): Unit = {
- if (scheduler != null) {
- scheduler.cancel()
- }
- }
-
- def reportWordCount(): Unit = {
- val current: Long = System.currentTimeMillis()
- LOG.info(s"Task ${taskContext.taskId} Throughput:" +
- s" ${(wordCount - snapShotWordCount, (current - snapShotTime) / 1000)} (words, second)")
- snapShotWordCount = wordCount
- snapShotTime = current
- }
-}
\ No newline at end of file