You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:39 UTC

[29/49] incubator-gearpump git commit: fix GEARPUMP-118 change package name to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala
deleted file mode 100644
index ee06b25..0000000
--- a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.transport.generator
-
-import scala.util.Random
-
-import io.gearpump.streaming.examples.transport.PassRecord
-import io.gearpump.util.LogUtil
-
-class PassRecordGenerator(vehicleId: String, city: MockCity, overdriveThreshold: Int) {
-  private val LOG = LogUtil.getLogger(getClass)
-  LOG.info(s"Generate pass record for vehicle $vehicleId")
-  private var timeStamp = System.currentTimeMillis()
-
-  private var locationId = city.randomLocationId()
-  private val random = new Random()
-  private val fakePlate = random.nextInt(1000) < 1000 * PassRecordGenerator.FAKE_PLATE_RATE
-  private val (randomMin, randomRange) = {
-    val lowerBound = MockCity.LENGTH_PER_BLOCK * 1000 * 60 * 60 / overdriveThreshold.toFloat
-    val upperBound = MockCity.LENGTH_PER_BLOCK * 1000 * 60 * 60 / MockCity.MINIMAL_SPEED.toFloat
-    val overdrive = (upperBound - lowerBound) * PassRecordGenerator.OVERDRIVE_RATE
-    val randomMin = Math.max(lowerBound - overdrive, PassRecordGenerator.TWOMINUTES)
-    val randomRange = upperBound - randomMin
-    (randomMin.toInt, randomRange.toInt)
-  }
-
-  def getNextPassRecord(): PassRecord = {
-    locationId = if (fakePlate) {
-      city.randomLocationId()
-    } else {
-      city.nextLocation(locationId)
-    }
-    timeStamp += (random.nextInt(randomRange) + randomMin)
-    PassRecord(vehicleId, locationId, timeStamp)
-  }
-}
-
-object PassRecordGenerator {
-  final val FAKE_PLATE_RATE = 0.01F
-  final val OVERDRIVE_RATE = 0.05F
-  final val TWOMINUTES = 2 * 60 * 1000
-
-  def create(generatorNum: Int, prefix: String, city: MockCity, overdriveThreshold: Int)
-    : Array[PassRecordGenerator] = {
-    var result = Map.empty[String, PassRecordGenerator]
-    val digitsNum = (Math.log10(generatorNum) + 1).toInt
-    for (i <- 1 to generatorNum) {
-      val vehicleId = prefix + s"%0${digitsNum}d".format(i)
-      val generator = new PassRecordGenerator(vehicleId, city, overdriveThreshold)
-      result += vehicleId -> generator
-    }
-    result.values.toArray
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Data.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Data.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Data.scala
new file mode 100644
index 0000000..0aaf72c
--- /dev/null
+++ b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Data.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.transport
+
+case class LocationInfo(id: String, row: Int, column: Int)
+
+// scalastyle:off equals.hash.code
+case class PassRecord(vehicleId: String, locationId: String, timeStamp: Long) {
+  override def hashCode: Int = vehicleId.hashCode
+}
+// scalastyle:on equals.hash.code
+
+case class GetTrace(vehicleId: String)
+
+case class VehicleTrace(records: Array[PassRecord])
+
+case class OverSpeedReport(vehicleId: String, speed: String, timestamp: Long, locationId: String)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/DataSource.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/DataSource.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/DataSource.scala
new file mode 100644
index 0000000..555e850
--- /dev/null
+++ b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/DataSource.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.transport
+
+import scala.concurrent.duration._
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.examples.transport.generator.{MockCity, PassRecordGenerator}
+import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
+
+class DataSource(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
+  import taskContext.{output, parallelism, scheduleOnce, taskId}
+  private val overdriveThreshold = conf.getInt(VelocityInspector.OVER_DRIVE_THRESHOLD).get
+  private val vehicleNum = conf.getInt(DataSource.VEHICLE_NUM).get / parallelism
+  private val citySize = conf.getInt(DataSource.MOCK_CITY_SIZE).get
+  private val mockCity = new MockCity(citySize)
+  private val recordGenerators: Array[PassRecordGenerator] =
+    PassRecordGenerator.create(vehicleNum, getIdentifier(taskId), mockCity, overdriveThreshold)
+
+  override def onStart(startTime: StartTime): Unit = {
+    self ! Message("start", System.currentTimeMillis())
+  }
+
+  override def onNext(msg: Message): Unit = {
+    recordGenerators.foreach(generator =>
+      output(Message(generator.getNextPassRecord(), System.currentTimeMillis())))
+    scheduleOnce(1.second)(self ! Message("continue", System.currentTimeMillis()))
+  }
+
+  private def getIdentifier(taskId: TaskId): String = {
+    // scalastyle:off non.ascii.character.disallowed
+    s"\u6caaA${taskId.processorId}${taskId.index}"
+    // scalastyle:on non.ascii.character.disallowed
+  }
+}
+
+object DataSource {
+  final val VEHICLE_NUM = "vehicle.number"
+  final val MOCK_CITY_SIZE = "mock.city.size"
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/QueryServer.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/QueryServer.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/QueryServer.scala
new file mode 100644
index 0000000..ff3b4b4
--- /dev/null
+++ b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/QueryServer.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.transport
+
+import java.util.concurrent.TimeUnit
+import scala.concurrent.Future
+import scala.util.{Failure, Success}
+
+import akka.actor.Actor._
+import akka.actor.{Actor, ActorRefFactory, Props}
+import akka.io.IO
+import akka.pattern.ask
+import spray.can.Http
+import spray.http.StatusCodes
+import spray.json._
+import spray.routing.{HttpService, Route}
+import upickle.default.write
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.partitioner.PartitionerDescription
+import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef}
+import org.apache.gearpump.streaming.examples.transport.QueryServer.{GetAllRecords, WebServer}
+import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
+import org.apache.gearpump.streaming.{DAG, ProcessorDescription, ProcessorId, StreamApplication}
+import org.apache.gearpump.util.Graph
+
+class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
+  import system.dispatcher
+  import taskContext.appMaster
+
+  var inspector: (ProcessorId, ProcessorDescription) = null
+  implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
+  private var overSpeedRecords = List.empty[OverSpeedReport]
+
+  override def onStart(startTime: StartTime): Unit = {
+    val dag = DAG(conf.getValue[Graph[ProcessorDescription, PartitionerDescription]](
+      StreamApplication.DAG).get)
+    inspector = dag.processors.find { kv =>
+      val (_, processor) = kv
+      processor.taskClass == classOf[VelocityInspector].getName
+    }.get
+    taskContext.actorOf(Props(new WebServer))
+  }
+
+  override def onNext(msg: Message): Unit = {
+  }
+
+  override def receiveUnManagedMessage: Receive = {
+    case getTrace@GetTrace(vehicleId: String) =>
+      val parallism = inspector._2.parallelism
+      val processorId = inspector._1
+      val analyzerTaskId = TaskId(processorId, (vehicleId.hashCode & Integer.MAX_VALUE) % parallism)
+      val requester = sender
+      (appMaster ? LookupTaskActorRef(analyzerTaskId))
+        .asInstanceOf[Future[TaskActorRef]].flatMap { task =>
+        (task.task ? getTrace).asInstanceOf[Future[VehicleTrace]]
+      }.map { trace =>
+        LOG.info(s"reporting $trace")
+        requester ! trace
+      }
+    case record@OverSpeedReport(vehicleId, speed, timestamp, locationId) =>
+      LOG.info(s"vehicle $vehicleId is over speed, the speed is $speed km/h")
+      overSpeedRecords :+= record
+    case GetAllRecords =>
+      sender ! QueryServer.OverSpeedRecords(overSpeedRecords.toArray.sortBy(_.timestamp))
+      overSpeedRecords = List.empty[OverSpeedReport]
+    case _ =>
+    // Ignore
+  }
+}
+
+object QueryServer {
+  object GetAllRecords
+
+  case class OverSpeedRecords(records: Array[OverSpeedReport])
+
+  class WebServer extends Actor with HttpService {
+
+    import context.dispatcher
+    implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
+    def actorRefFactory: ActorRefFactory = context
+    implicit val system = context.system
+
+    IO(Http) ! Http.Bind(self, interface = "0.0.0.0", port = 8080)
+
+    override def receive: Receive = runRoute(webServer ~ staticRoute)
+
+    def webServer: Route = {
+      path("trace" / Segment) { vehicleId =>
+        get {
+          onComplete((context.parent ? GetTrace(vehicleId)).asInstanceOf[Future[VehicleTrace]]) {
+            case Success(trace: VehicleTrace) =>
+              val json = write(trace)
+              complete(pretty(json))
+            case Failure(ex) => complete(StatusCodes.InternalServerError,
+              s"An error occurred: ${ex.getMessage}")
+          }
+        }
+      } ~
+      path("records") {
+        get {
+          onComplete((context.parent ? GetAllRecords).asInstanceOf[Future[OverSpeedRecords]]) {
+            case Success(records: OverSpeedRecords) =>
+              val json = write(records)
+              complete(pretty(json))
+            case Failure(ex) => complete(StatusCodes.InternalServerError,
+              s"An error occurred: ${ex.getMessage}")
+          }
+        }
+      }
+    }
+
+    val staticRoute = {
+      pathEndOrSingleSlash {
+        getFromResource("transport/transport.html")
+      } ~
+        pathPrefix("css") {
+          get {
+            getFromResourceDirectory("transport/css")
+          }
+        } ~
+        pathPrefix("svg") {
+          get {
+            getFromResourceDirectory("transport/svg")
+          }
+        } ~
+        pathPrefix("js") {
+          get {
+            getFromResourceDirectory("transport/js")
+          }
+        }
+    }
+
+    private def pretty(json: String): String = {
+      json.parseJson.prettyPrint
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Transport.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Transport.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Transport.scala
new file mode 100644
index 0000000..5beb2e1
--- /dev/null
+++ b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Transport.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.transport
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
+import org.apache.gearpump.partitioner.HashPartitioner
+import org.apache.gearpump.streaming.{Processor, StreamApplication}
+import org.apache.gearpump.util.Graph._
+import org.apache.gearpump.util.{AkkaApp, Graph}
+
+/** A city smart transportation streaming application */
+object Transport extends AkkaApp with ArgumentsParser {
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "source" -> CLIOption[Int]("<how many task to generate data>", required = false,
+      defaultValue = Some(10)),
+    "inspector" -> CLIOption[Int]("<how many over speed inspector>", required = false,
+      defaultValue = Some(4)),
+    "vehicle" -> CLIOption[Int]("<how many vehicles's to generate>", required = false,
+      defaultValue = Some(1000)),
+    "citysize" -> CLIOption[Int]("<the blocks number of the mock city>", required = false,
+      defaultValue = Some(10)),
+    "threshold" -> CLIOption[Int]("<overdrive threshold, km/h>", required = false,
+      defaultValue = Some(60)))
+
+  def application(config: ParseResult): StreamApplication = {
+    val sourceNum = config.getInt("source")
+    val inspectorNum = config.getInt("inspector")
+    val vehicleNum = config.getInt("vehicle")
+    val citysize = config.getInt("citysize")
+    val threshold = config.getInt("threshold")
+    val source = Processor[DataSource](sourceNum)
+    val inspector = Processor[VelocityInspector](inspectorNum)
+    val queryServer = Processor[QueryServer](1)
+    val partitioner = new HashPartitioner
+
+    val userConfig = UserConfig.empty.withInt(DataSource.VEHICLE_NUM, vehicleNum).
+      withInt(DataSource.MOCK_CITY_SIZE, citysize).
+      withInt(VelocityInspector.OVER_DRIVE_THRESHOLD, threshold).
+      withInt(VelocityInspector.FAKE_PLATE_THRESHOLD, 200)
+    StreamApplication("transport", Graph(source ~ partitioner ~> inspector,
+      Node(queryServer)), userConfig)
+  }
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+    val context = ClientContext(akkaConf)
+    implicit val system = context.system
+    context.submit(application(config))
+    context.close()
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/VelocityInspector.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/VelocityInspector.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/VelocityInspector.scala
new file mode 100644
index 0000000..4d9bd04
--- /dev/null
+++ b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/VelocityInspector.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.transport
+
+import java.util.concurrent.TimeUnit
+import scala.collection.immutable.Queue
+import scala.collection.mutable
+import scala.concurrent.Future
+
+import akka.actor.Actor._
+import akka.actor.ActorRef
+import akka.pattern.ask
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.partitioner.PartitionerDescription
+import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef}
+import org.apache.gearpump.streaming.examples.transport.generator.MockCity
+import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
+import org.apache.gearpump.streaming.{DAG, ProcessorDescription, StreamApplication}
+import org.apache.gearpump.util.Graph
+
+class VelocityInspector(taskContext: TaskContext, conf: UserConfig)
+  extends Task(taskContext, conf) {
+
+  import system.dispatcher
+  import taskContext.appMaster
+  implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
+  private val passRecords = mutable.Map.empty[String, Queue[PassRecord]]
+  private val fakePlateThreshold = conf.getInt(VelocityInspector.FAKE_PLATE_THRESHOLD).get
+  private val overdriveThreshold = conf.getInt(VelocityInspector.OVER_DRIVE_THRESHOLD).get
+  private val citySize = conf.getInt(DataSource.MOCK_CITY_SIZE).get
+  private val mockCity = new MockCity(citySize)
+  private var queryServerActor: ActorRef = null
+
+  override def onStart(startTime: StartTime): Unit = {
+    val dag = DAG(conf.getValue[Graph[ProcessorDescription, PartitionerDescription]](
+      StreamApplication.DAG).get)
+    val queryServer = dag.processors.find { kv =>
+      val (_, processor) = kv
+      processor.taskClass == classOf[QueryServer].getName
+    }.get
+    val queryServerTaskId = TaskId(queryServer._1, 0)
+    (appMaster ? LookupTaskActorRef(queryServerTaskId)).asInstanceOf[Future[TaskActorRef]]
+      .map { task =>
+        queryServerActor = task.task
+      }
+  }
+
+  import org.apache.gearpump.streaming.examples.transport.VelocityInspector._
+  override def onNext(msg: Message): Unit = {
+    msg.msg match {
+      case passRecord: PassRecord =>
+        val records = passRecords.getOrElse(passRecord.vehicleId, Queue.empty[PassRecord])
+        if (records.size > 0) {
+          val velocity = getVelocity(passRecord, records.last)
+          val formatted = "%.2f".format(velocity)
+          if (velocity > overdriveThreshold) {
+            if (velocity > fakePlateThreshold) {
+              LOG.info(s"vehicle ${passRecord.vehicleId} maybe a fake plate, " +
+                s"the speed is $formatted km/h")
+            }
+            if (queryServerActor != null) {
+              queryServerActor ! OverSpeedReport(passRecord.vehicleId, formatted,
+                passRecord.timeStamp, passRecord.locationId)
+            }
+          }
+        }
+        passRecords.update(passRecord.vehicleId, records.enqueueFinite(passRecord, RECORDS_NUM))
+    }
+  }
+
+  override def receiveUnManagedMessage: Receive = {
+    case GetTrace(vehicleId) =>
+      val records = passRecords.getOrElse(vehicleId, Queue.empty[PassRecord])
+      sender ! VehicleTrace(records.toArray.sortBy(_.timeStamp))
+  }
+
+  private def getVelocity(passRecord: PassRecord, lastPassRecord: PassRecord): Float = {
+    val distanceInKm = getDistance(lastPassRecord.locationId, passRecord.locationId)
+    val timeInHour = (passRecord.timeStamp - lastPassRecord.timeStamp).toFloat / (1000 * 60 * 60)
+    distanceInKm / timeInHour
+  }
+
+  private def getDistance(location1: String, location2: String): Long = {
+    mockCity.getDistance(location1, location2)
+  }
+}
+
+object VelocityInspector {
+  final val OVER_DRIVE_THRESHOLD = "overdrive.threshold"
+  final val FAKE_PLATE_THRESHOLD = "fakeplate.threshold"
+  final val RECORDS_NUM = 20
+
+  class FiniteQueue[T](q: Queue[T]) {
+    def enqueueFinite[B >: T](elem: B, maxSize: Int): Queue[B] = {
+      var result = q.enqueue(elem)
+      while (result.size > maxSize) {
+        result = result.dequeue._2
+      }
+      result
+    }
+  }
+
+  import scala.language.implicitConversions
+
+  implicit def queue2FiniteQueue[T](q: Queue[T]): FiniteQueue[T] = new FiniteQueue[T](q)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCity.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCity.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCity.scala
new file mode 100644
index 0000000..60e0bcf
--- /dev/null
+++ b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCity.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.transport.generator
+
+import scala.util.Random
+
+import org.apache.gearpump.streaming.examples.transport.generator.MockCity._
+
+class MockCity(size: Int) {
+  private val random = new Random()
+  private val directions = Array(UP, DOWN, LEFT, RIGHT)
+
+  def nextLocation(currentLocationId: String): String = {
+    val coordinate = idToCoordinate(currentLocationId)
+    val direction = directions(random.nextInt(4))
+    val newCoordinate = coordinate.addOffset(direction)
+    if (inCity(newCoordinate)) {
+      coordinateToId(newCoordinate)
+    } else {
+      nextLocation(currentLocationId)
+    }
+  }
+
+  def getDistance(locationId1: String, locationId2: String): Long = {
+    val coordinate1 = idToCoordinate(locationId1)
+    val coordinate2 = idToCoordinate(locationId2)
+    val blocks = Math.abs(coordinate1.row - coordinate2.row) +
+      Math.abs(coordinate1.column - coordinate2.column)
+    blocks * LENGTH_PER_BLOCK
+  }
+
+  def randomLocationId(): String = {
+    val row = random.nextInt(size)
+    val column = random.nextInt(size)
+    coordinateToId(Coordinate(row, column))
+  }
+
+  private def coordinateToId(coordinate: Coordinate): String = {
+    s"Id_${coordinate.row}_${coordinate.column}"
+  }
+
+  private def idToCoordinate(locationId: String): Coordinate = {
+    val attr = locationId.split("_")
+    val row = attr(1).toInt
+    val column = attr(2).toInt
+    Coordinate(row, column)
+  }
+
+  private def inCity(coordinate: Coordinate): Boolean = {
+    coordinate.row >= 0 &&
+      coordinate.row < size &&
+      coordinate.column >= 0 &&
+      coordinate.column < size
+  }
+}
+
+object MockCity {
+  // The length of the mock city, km
+  final val LENGTH_PER_BLOCK = 5
+  // The minimal speed, km/h
+  final val MINIMAL_SPEED = 10
+
+  final val UP = Coordinate(0, 1)
+  final val DOWN = Coordinate(0, -1)
+  final val LEFT = Coordinate(-1, 0)
+  final val RIGHT = Coordinate(1, 0)
+
+  case class Coordinate(row: Int, column: Int) {
+    def addOffset(coordinate: Coordinate): Coordinate = {
+      Coordinate(this.row + coordinate.row, this.column + coordinate.column)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala
new file mode 100644
index 0000000..e8c1c59
--- /dev/null
+++ b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.transport.generator
+
+import scala.util.Random
+
+import org.apache.gearpump.streaming.examples.transport.PassRecord
+import org.apache.gearpump.util.LogUtil
+
+class PassRecordGenerator(vehicleId: String, city: MockCity, overdriveThreshold: Int) {
+  private val LOG = LogUtil.getLogger(getClass)
+  LOG.info(s"Generate pass record for vehicle $vehicleId")
+  private var timeStamp = System.currentTimeMillis()
+
+  private var locationId = city.randomLocationId()
+  private val random = new Random()
+  private val fakePlate = random.nextInt(1000) < 1000 * PassRecordGenerator.FAKE_PLATE_RATE
+  private val (randomMin, randomRange) = {
+    val lowerBound = MockCity.LENGTH_PER_BLOCK * 1000 * 60 * 60 / overdriveThreshold.toFloat
+    val upperBound = MockCity.LENGTH_PER_BLOCK * 1000 * 60 * 60 / MockCity.MINIMAL_SPEED.toFloat
+    val overdrive = (upperBound - lowerBound) * PassRecordGenerator.OVERDRIVE_RATE
+    val randomMin = Math.max(lowerBound - overdrive, PassRecordGenerator.TWOMINUTES)
+    val randomRange = upperBound - randomMin
+    (randomMin.toInt, randomRange.toInt)
+  }
+
+  def getNextPassRecord(): PassRecord = {
+    locationId = if (fakePlate) {
+      city.randomLocationId()
+    } else {
+      city.nextLocation(locationId)
+    }
+    timeStamp += (random.nextInt(randomRange) + randomMin)
+    PassRecord(vehicleId, locationId, timeStamp)
+  }
+}
+
+object PassRecordGenerator {
+  final val FAKE_PLATE_RATE = 0.01F
+  final val OVERDRIVE_RATE = 0.05F
+  final val TWOMINUTES = 2 * 60 * 1000
+
+  def create(generatorNum: Int, prefix: String, city: MockCity, overdriveThreshold: Int)
+    : Array[PassRecordGenerator] = {
+    var result = Map.empty[String, PassRecordGenerator]
+    val digitsNum = (Math.log10(generatorNum) + 1).toInt
+    for (i <- 1 to generatorNum) {
+      val vehicleId = prefix + s"%0${digitsNum}d".format(i)
+      val generator = new PassRecordGenerator(vehicleId, city, overdriveThreshold)
+      result += vehicleId -> generator
+    }
+    result.values.toArray
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/DataSourceSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/DataSourceSpec.scala b/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/DataSourceSpec.scala
deleted file mode 100644
index 75f9d60..0000000
--- a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/DataSourceSpec.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.transport
-
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalatest.{FlatSpec, Matchers}
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.task.StartTime
-
-class DataSourceSpec extends FlatSpec with Matchers {
-  it should "create the pass record" in {
-    val vehicleNum = 2
-    val context = MockUtil.mockTaskContext
-
-    val userConfig = UserConfig.empty.withInt(DataSource.VEHICLE_NUM, vehicleNum).
-      withInt(DataSource.MOCK_CITY_SIZE, 10).
-      withInt(VelocityInspector.OVER_DRIVE_THRESHOLD, 60).
-      withInt(VelocityInspector.FAKE_PLATE_THRESHOLD, 200)
-
-    val source = new DataSource(context, userConfig)
-    source.onStart(StartTime(0))
-    source.onNext(Message("start"))
-    verify(context, times(vehicleNum)).output(any[Message])
-    source.onStop()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/TransportSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/TransportSpec.scala b/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/TransportSpec.scala
deleted file mode 100644
index b61fd43..0000000
--- a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/TransportSpec.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.transport
-
-import scala.concurrent.Future
-import scala.util.Success
-
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{BeforeAndAfterAll, Matchers, PropSpec}
-
-import io.gearpump.cluster.ClientToMaster.SubmitApplication
-import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
-import io.gearpump.cluster.{MasterHarness, TestUtil}
-
-class TransportSpec
-  extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll with MasterHarness {
-
-  override def beforeAll {
-    startActorSystem()
-  }
-
-  override def afterAll {
-    shutdownActorSystem()
-  }
-
-  protected override def config = TestUtil.DEFAULT_CONFIG
-
-  property("Transport should succeed to submit application with required arguments") {
-    val requiredArgs = Array.empty[String]
-    val optionalArgs = Array(
-      "-source", "1",
-      "-inspector", "1",
-      "-vehicle", "100",
-      "-citysize", "10",
-      "-threshold", "60")
-
-    val args = {
-      Table(
-        ("requiredArgs", "optionalArgs"),
-        (requiredArgs, optionalArgs)
-      )
-    }
-    val masterReceiver = createMockMaster()
-    forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) =>
-      val args = requiredArgs ++ optionalArgs
-
-      Future {
-        Transport.main(masterConfig, args)
-      }
-      masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
-      masterReceiver.reply(SubmitApplicationResult(Success(0)))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/MockCitySpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/MockCitySpec.scala b/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/MockCitySpec.scala
deleted file mode 100644
index e91d91c..0000000
--- a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/MockCitySpec.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.transport.generator
-
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-class MockCitySpec extends PropSpec with PropertyChecks with Matchers {
-
-  property("MockCity should maintain the location properly") {
-    val city = new MockCity(10)
-    val start = city.randomLocationId()
-    val nextLocation = city.nextLocation(start)
-    assert(city.getDistance(start, nextLocation) == MockCity.LENGTH_PER_BLOCK)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala b/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala
deleted file mode 100644
index 1c1901e..0000000
--- a/examples/streaming/transport/src/test/scala/io/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.transport.generator
-
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-class PassRecordGeneratorSpec extends PropSpec with PropertyChecks with Matchers {
-
-  property("PassRecordGenerator should generate pass record") {
-    val id = "test"
-    val city = new MockCity(10)
-    val generator = new PassRecordGenerator(id, city, 60)
-    val passrecord1 = generator.getNextPassRecord()
-    val passrecord2 = generator.getNextPassRecord()
-    assert(city.getDistance(passrecord1.locationId, passrecord2.locationId) ==
-      MockCity.LENGTH_PER_BLOCK)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/DataSourceSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/DataSourceSpec.scala b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/DataSourceSpec.scala
new file mode 100644
index 0000000..1f525ae
--- /dev/null
+++ b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/DataSourceSpec.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.transport
+
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.{FlatSpec, Matchers}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.task.StartTime
+
+class DataSourceSpec extends FlatSpec with Matchers {
+  it should "create the pass record" in {
+    val vehicleNum = 2
+    val context = MockUtil.mockTaskContext
+
+    val userConfig = UserConfig.empty.withInt(DataSource.VEHICLE_NUM, vehicleNum).
+      withInt(DataSource.MOCK_CITY_SIZE, 10).
+      withInt(VelocityInspector.OVER_DRIVE_THRESHOLD, 60).
+      withInt(VelocityInspector.FAKE_PLATE_THRESHOLD, 200)
+
+    val source = new DataSource(context, userConfig)
+    source.onStart(StartTime(0))
+    source.onNext(Message("start"))
+    verify(context, times(vehicleNum)).output(any[Message])
+    source.onStop()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/TransportSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/TransportSpec.scala b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/TransportSpec.scala
new file mode 100644
index 0000000..2f83de5
--- /dev/null
+++ b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/TransportSpec.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.transport
+
+import scala.concurrent.Future
+import scala.util.Success
+
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{BeforeAndAfterAll, Matchers, PropSpec}
+
+import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication
+import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult
+import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
+
+class TransportSpec
+  extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll with MasterHarness {
+
+  override def beforeAll {
+    startActorSystem()
+  }
+
+  override def afterAll {
+    shutdownActorSystem()
+  }
+
+  protected override def config = TestUtil.DEFAULT_CONFIG
+
+  property("Transport should succeed to submit application with required arguments") {
+    val requiredArgs = Array.empty[String]
+    val optionalArgs = Array(
+      "-source", "1",
+      "-inspector", "1",
+      "-vehicle", "100",
+      "-citysize", "10",
+      "-threshold", "60")
+
+    val args = {
+      Table(
+        ("requiredArgs", "optionalArgs"),
+        (requiredArgs, optionalArgs)
+      )
+    }
+    val masterReceiver = createMockMaster()
+    forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) =>
+      val args = requiredArgs ++ optionalArgs
+
+      Future {
+        Transport.main(masterConfig, args)
+      }
+      masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
+      masterReceiver.reply(SubmitApplicationResult(Success(0)))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCitySpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCitySpec.scala b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCitySpec.scala
new file mode 100644
index 0000000..ba4eb2d
--- /dev/null
+++ b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCitySpec.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.transport.generator
+
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+class MockCitySpec extends PropSpec with PropertyChecks with Matchers {
+
+  property("MockCity should maintain the location properly") {
+    val city = new MockCity(10)
+    val start = city.randomLocationId()
+    val nextLocation = city.nextLocation(start)
+    assert(city.getDistance(start, nextLocation) == MockCity.LENGTH_PER_BLOCK)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala
new file mode 100644
index 0000000..f0eebbf
--- /dev/null
+++ b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.transport.generator
+
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+class PassRecordGeneratorSpec extends PropSpec with PropertyChecks with Matchers {
+
+  property("PassRecordGenerator should generate pass record") {
+    val id = "test"
+    val city = new MockCity(10)
+    val generator = new PassRecordGenerator(id, city, 60)
+    val passrecord1 = generator.getNextPassRecord()
+    val passrecord2 = generator.getNextPassRecord()
+    assert(city.getDistance(passrecord1.locationId, passrecord2.locationId) ==
+      MockCity.LENGTH_PER_BLOCK)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Split.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Split.java b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Split.java
deleted file mode 100644
index 720e179..0000000
--- a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Split.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.wordcountjava;
-
-import io.gearpump.Message;
-import io.gearpump.cluster.UserConfig;
-import io.gearpump.streaming.javaapi.Task;
-import io.gearpump.streaming.task.StartTime;
-import io.gearpump.streaming.task.TaskContext;
-
-public class Split extends Task {
-
-  public static String TEXT = "This is a good start for java! bingo! bingo! ";
-
-  public Split(TaskContext taskContext, UserConfig userConf) {
-    super(taskContext, userConf);
-  }
-
-  private Long now() {
-    return System.currentTimeMillis();
-  }
-
-  @Override
-  public void onStart(StartTime startTime) {
-    self().tell(new Message("start", now()), self());
-  }
-
-  @Override
-  public void onNext(Message msg) {
-
-    // Split the TEXT to words
-    String[] words = TEXT.split(" ");
-    for (int i = 0; i < words.length; i++) {
-      context.output(new Message(words[i], now()));
-    }
-    self().tell(new Message("next", now()), self());
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Sum.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Sum.java b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Sum.java
deleted file mode 100644
index 28cf8cb..0000000
--- a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/Sum.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.wordcountjava;
-
-import io.gearpump.Message;
-import io.gearpump.cluster.UserConfig;
-import io.gearpump.streaming.javaapi.Task;
-import io.gearpump.streaming.task.StartTime;
-import io.gearpump.streaming.task.TaskContext;
-import org.slf4j.Logger;
-
-import java.util.HashMap;
-
-public class Sum extends Task {
-
-  private Logger LOG = super.LOG();
-  private HashMap<String, Integer> wordCount = new HashMap<String, Integer>();
-
-  public Sum(TaskContext taskContext, UserConfig userConf) {
-    super(taskContext, userConf);
-  }
-
-  @Override
-  public void onStart(StartTime startTime) {
-    //skip
-  }
-
-  @Override
-  public void onNext(Message messagePayLoad) {
-    String word = (String) (messagePayLoad.msg());
-    Integer current = wordCount.get(word);
-    if (current == null) {
-      current = 0;
-    }
-    Integer newCount = current + 1;
-    wordCount.put(word, newCount);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/WordCount.java b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/WordCount.java
deleted file mode 100644
index 40054d3..0000000
--- a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/WordCount.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.wordcountjava;
-
-import com.typesafe.config.Config;
-import io.gearpump.cluster.ClusterConfig;
-import io.gearpump.cluster.UserConfig;
-import io.gearpump.cluster.client.ClientContext;
-import io.gearpump.cluster.embedded.EmbeddedCluster;
-import io.gearpump.partitioner.HashPartitioner;
-import io.gearpump.partitioner.Partitioner;
-import io.gearpump.streaming.javaapi.Graph;
-import io.gearpump.streaming.javaapi.Processor;
-import io.gearpump.streaming.javaapi.StreamApplication;
-
-/** Java version of WordCount with Processor Graph API */
-public class WordCount {
-
-  public static void main(String[] args) throws InterruptedException {
-    main(ClusterConfig.defaultConfig(), args);
-  }
-
-  public static void main(Config akkaConf, String[] args) throws InterruptedException {
-
-    // For split task, we config to create two tasks
-    int splitTaskNumber = 2;
-    Processor split = new Processor(Split.class).withParallelism(splitTaskNumber);
-
-    // For sum task, we have two summer.
-    int sumTaskNumber = 2;
-    Processor sum = new Processor(Sum.class).withParallelism(sumTaskNumber);
-
-    // construct the graph
-    Graph graph = new Graph();
-    graph.addVertex(split);
-    graph.addVertex(sum);
-
-    Partitioner partitioner = new HashPartitioner();
-    graph.addEdge(split, partitioner, sum);
-
-    UserConfig conf = UserConfig.empty();
-    StreamApplication app = new StreamApplication("wordcountJava", conf, graph);
-
-    EmbeddedCluster localCluster = null;
-
-    Boolean debugMode = System.getProperty("DEBUG") != null;
-
-    if (debugMode) {
-      localCluster = new EmbeddedCluster(akkaConf);
-      localCluster.start();
-    }
-
-    ClientContext masterClient = null;
-
-    if (localCluster != null) {
-      masterClient = localCluster.newClientContext();
-    } else {
-      // create master client
-      // It will read the master settings under gearpump.cluster.masters
-      masterClient = new ClientContext(akkaConf);
-    }
-
-    masterClient.submit(app);
-
-    if (debugMode) {
-      Thread.sleep(30 * 1000); // sleep for 30 seconds.
-    }
-
-    masterClient.close();
-
-    if (localCluster != null) {
-      localCluster.stop();
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
deleted file mode 100644
index 3aefd7f..0000000
--- a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.wordcountjava.dsl;
-
-import com.typesafe.config.Config;
-import io.gearpump.cluster.ClusterConfig;
-import io.gearpump.cluster.UserConfig;
-import io.gearpump.cluster.client.ClientContext;
-import io.gearpump.google.common.collect.Lists;
-import io.gearpump.streaming.dsl.javaapi.JavaStream;
-import io.gearpump.streaming.dsl.javaapi.JavaStreamApp;
-import io.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
-import io.gearpump.streaming.javaapi.dsl.functions.GroupByFunction;
-import io.gearpump.streaming.javaapi.dsl.functions.MapFunction;
-import io.gearpump.streaming.javaapi.dsl.functions.ReduceFunction;
-import scala.Tuple2;
-
-import java.util.Iterator;
-import java.util.List;
-
-/** Java version of WordCount with high level DSL API */
-public class WordCount {
-
-  public static void main(String[] args) throws InterruptedException {
-    main(ClusterConfig.defaultConfig(), args);
-  }
-
-  public static void main(Config akkaConf, String[] args) throws InterruptedException {
-    ClientContext context = new ClientContext(akkaConf);
-    JavaStreamApp app = new JavaStreamApp("JavaDSL", context, UserConfig.empty());
-    List<String> source = Lists.newArrayList("This is a good start, bingo!! bingo!!");
-
-    JavaStream<String> sentence = app.source(source, 1, UserConfig.empty(), "source");
-
-    JavaStream<String> words = sentence.flatMap(new FlatMapFunction<String, String>() {
-      @Override
-      public Iterator<String> apply(String s) {
-        return Lists.newArrayList(s.split("\\s+")).iterator();
-      }
-    }, "flatMap");
-
-    JavaStream<Tuple2<String, Integer>> ones = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
-      @Override
-      public Tuple2<String, Integer> apply(String s) {
-        return new Tuple2<String, Integer>(s, 1);
-      }
-    }, "map");
-
-    JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(new GroupByFunction<Tuple2<String, Integer>, String>() {
-      @Override
-      public String apply(Tuple2<String, Integer> tuple) {
-        return tuple._1();
-      }
-    }, 1, "groupBy");
-
-    JavaStream<Tuple2<String, Integer>> wordcount = groupedOnes.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
-      @Override
-      public Tuple2<String, Integer> apply(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) {
-        return new Tuple2<String, Integer>(t1._1(), t1._2() + t2._2());
-      }
-    }, "reduce");
-
-    wordcount.log();
-
-    app.run();
-    context.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java
new file mode 100644
index 0000000..76069c1
--- /dev/null
+++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.wordcountjava;
+
+import org.apache.gearpump.Message;
+import org.apache.gearpump.cluster.UserConfig;
+import org.apache.gearpump.streaming.javaapi.Task;
+import org.apache.gearpump.streaming.task.StartTime;
+import org.apache.gearpump.streaming.task.TaskContext;
+
+public class Split extends Task {
+
+  public static String TEXT = "This is a good start for java! bingo! bingo! ";
+
+  public Split(TaskContext taskContext, UserConfig userConf) {
+    super(taskContext, userConf);
+  }
+
+  private Long now() {
+    return System.currentTimeMillis();
+  }
+
+  @Override
+  public void onStart(StartTime startTime) {
+    self().tell(new Message("start", now()), self());
+  }
+
+  @Override
+  public void onNext(Message msg) {
+
+    // Split the TEXT to words
+    String[] words = TEXT.split(" ");
+    for (int i = 0; i < words.length; i++) {
+      context.output(new Message(words[i], now()));
+    }
+    self().tell(new Message("next", now()), self());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java
new file mode 100644
index 0000000..89c3b14
--- /dev/null
+++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.wordcountjava;
+
+import org.apache.gearpump.Message;
+import org.apache.gearpump.cluster.UserConfig;
+import org.apache.gearpump.streaming.javaapi.Task;
+import org.apache.gearpump.streaming.task.StartTime;
+import org.apache.gearpump.streaming.task.TaskContext;
+import org.slf4j.Logger;
+
+import java.util.HashMap;
+
+public class Sum extends Task {
+
+  private Logger LOG = super.LOG();
+  private HashMap<String, Integer> wordCount = new HashMap<String, Integer>();
+
+  public Sum(TaskContext taskContext, UserConfig userConf) {
+    super(taskContext, userConf);
+  }
+
+  @Override
+  public void onStart(StartTime startTime) {
+    //skip
+  }
+
+  @Override
+  public void onNext(Message messagePayLoad) {
+    String word = (String) (messagePayLoad.msg());
+    Integer current = wordCount.get(word);
+    if (current == null) {
+      current = 0;
+    }
+    Integer newCount = current + 1;
+    wordCount.put(word, newCount);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java
new file mode 100644
index 0000000..ee44536
--- /dev/null
+++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.wordcountjava;
+
+import com.typesafe.config.Config;
+import org.apache.gearpump.cluster.ClusterConfig;
+import org.apache.gearpump.cluster.UserConfig;
+import org.apache.gearpump.cluster.client.ClientContext;
+import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
+import org.apache.gearpump.partitioner.HashPartitioner;
+import org.apache.gearpump.partitioner.Partitioner;
+import org.apache.gearpump.streaming.javaapi.Graph;
+import org.apache.gearpump.streaming.javaapi.Processor;
+import org.apache.gearpump.streaming.javaapi.StreamApplication;
+
+/** Java version of WordCount with Processor Graph API */
+public class WordCount {
+
+  public static void main(String[] args) throws InterruptedException {
+    main(ClusterConfig.defaultConfig(), args);
+  }
+
+  public static void main(Config akkaConf, String[] args) throws InterruptedException {
+
+    // For split task, we config to create two tasks
+    int splitTaskNumber = 2;
+    Processor split = new Processor(Split.class).withParallelism(splitTaskNumber);
+
+    // For sum task, we have two summer.
+    int sumTaskNumber = 2;
+    Processor sum = new Processor(Sum.class).withParallelism(sumTaskNumber);
+
+    // construct the graph
+    Graph graph = new Graph();
+    graph.addVertex(split);
+    graph.addVertex(sum);
+
+    Partitioner partitioner = new HashPartitioner();
+    graph.addEdge(split, partitioner, sum);
+
+    UserConfig conf = UserConfig.empty();
+    StreamApplication app = new StreamApplication("wordcountJava", conf, graph);
+
+    EmbeddedCluster localCluster = null;
+
+    Boolean debugMode = System.getProperty("DEBUG") != null;
+
+    if (debugMode) {
+      localCluster = new EmbeddedCluster(akkaConf);
+      localCluster.start();
+    }
+
+    ClientContext masterClient = null;
+
+    if (localCluster != null) {
+      masterClient = localCluster.newClientContext();
+    } else {
+      // create master client
+      // It will read the master settings under gearpump.cluster.masters
+      masterClient = new ClientContext(akkaConf);
+    }
+
+    masterClient.submit(app);
+
+    if (debugMode) {
+      Thread.sleep(30 * 1000); // sleep for 30 seconds.
+    }
+
+    masterClient.close();
+
+    if (localCluster != null) {
+      localCluster.stop();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
new file mode 100644
index 0000000..0ecc42e
--- /dev/null
+++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.wordcountjava.dsl;
+
+import com.typesafe.config.Config;
+import org.apache.gearpump.cluster.ClusterConfig;
+import org.apache.gearpump.cluster.UserConfig;
+import org.apache.gearpump.cluster.client.ClientContext;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
+import org.apache.gearpump.streaming.javaapi.dsl.functions.GroupByFunction;
+import org.apache.gearpump.streaming.javaapi.dsl.functions.MapFunction;
+import org.apache.gearpump.streaming.javaapi.dsl.functions.ReduceFunction;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+/** Java version of WordCount with high level DSL API */
+public class WordCount {
+
+  public static void main(String[] args) throws InterruptedException {
+    main(ClusterConfig.defaultConfig(), args);
+  }
+
+  public static void main(Config akkaConf, String[] args) throws InterruptedException {
+    ClientContext context = new ClientContext(akkaConf);
+    JavaStreamApp app = new JavaStreamApp("JavaDSL", context, UserConfig.empty());
+    List<String> source = new ArrayList<>(Arrays.asList("This is a good start, bingo!! bingo!!"));
+
+    JavaStream<String> sentence = app.source(source, 1, UserConfig.empty(), "source");
+
+    JavaStream<String> words = sentence.flatMap(new FlatMapFunction<String, String>() {
+      @Override
+      public Iterator<String> apply(String s) {
+        return new ArrayList<String>(Arrays.asList(s.split("\\s+"))).iterator();
+      }
+    }, "flatMap");
+
+    JavaStream<Tuple2<String, Integer>> ones = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
+      @Override
+      public Tuple2<String, Integer> apply(String s) {
+        return new Tuple2<String, Integer>(s, 1);
+      }
+    }, "map");
+
+    JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(new GroupByFunction<Tuple2<String, Integer>, String>() {
+      @Override
+      public String apply(Tuple2<String, Integer> tuple) {
+        return tuple._1();
+      }
+    }, 1, "groupBy");
+
+    JavaStream<Tuple2<String, Integer>> wordcount = groupedOnes.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
+      @Override
+      public Tuple2<String, Integer> apply(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) {
+        return new Tuple2<String, Integer>(t1._1(), t1._2() + t2._2());
+      }
+    }, "reduce");
+
+    wordcount.log();
+
+    app.run();
+    context.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount-java/src/test/scala/io/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/test/scala/io/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala b/examples/streaming/wordcount-java/src/test/scala/io/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala
deleted file mode 100644
index 1a1d019..0000000
--- a/examples/streaming/wordcount-java/src/test/scala/io/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.wordcount
-
-import scala.concurrent.Future
-import scala.util.Success
-
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
-
-import io.gearpump.cluster.ClientToMaster.SubmitApplication
-import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
-import io.gearpump.cluster.{MasterHarness, TestUtil}
-import io.gearpump.streaming.examples.wordcountjava.WordCount
-
-class WordCountSpec
-  extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness {
-
-  before {
-    startActorSystem()
-  }
-
-  after {
-    shutdownActorSystem()
-  }
-
-  protected override def config = TestUtil.DEFAULT_CONFIG
-
-  property("WordCount should succeed to submit application with required arguments") {
-    val requiredArgs = Array.empty[String]
-
-    val masterReceiver = createMockMaster()
-
-    val args = requiredArgs
-
-    Future {
-      WordCount.main(masterConfig, args)
-    }
-
-    masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
-    masterReceiver.reply(SubmitApplicationResult(Success(0)))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount-java/src/test/scala/org/apache/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/test/scala/org/apache/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala b/examples/streaming/wordcount-java/src/test/scala/org/apache/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala
new file mode 100644
index 0000000..3736c86
--- /dev/null
+++ b/examples/streaming/wordcount-java/src/test/scala/org/apache/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.wordcountjava
+
+import scala.concurrent.Future
+import scala.util.Success
+
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
+
+import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication
+import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult
+import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
+import org.apache.gearpump.streaming.examples.wordcountjava.WordCount
+
+class WordCountSpec
+  extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness {
+
+  before {
+    startActorSystem()
+  }
+
+  after {
+    shutdownActorSystem()
+  }
+
+  protected override def config = TestUtil.DEFAULT_CONFIG
+
+  property("WordCount should succeed to submit application with required arguments") {
+    val requiredArgs = Array.empty[String]
+
+    val masterReceiver = createMockMaster()
+
+    val args = requiredArgs
+
+    Future {
+      WordCount.main(masterConfig, args)
+    }
+
+    masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
+    masterReceiver.reply(SubmitApplicationResult(Success(0)))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Split.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Split.scala b/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Split.scala
deleted file mode 100644
index 387bc75..0000000
--- a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Split.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.wordcount
-
-import java.util.concurrent.TimeUnit
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
-
-class Split(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
-  import taskContext.output
-
-  override def onStart(startTime: StartTime): Unit = {
-    self ! Message("start")
-  }
-
-  override def onNext(msg: Message): Unit = {
-    Split.TEXT_TO_SPLIT.lines.foreach { line =>
-      line.split("[\\s]+").filter(_.nonEmpty).foreach { msg =>
-        output(new Message(msg, System.currentTimeMillis()))
-      }
-    }
-
-    import scala.concurrent.duration._
-    taskContext.scheduleOnce(Duration(100, TimeUnit.MILLISECONDS))(self !
-      Message("continue", System.currentTimeMillis()))
-  }
-}
-
-object Split {
-  val TEXT_TO_SPLIT =
-    """
-      |   Licensed to the Apache Software Foundation (ASF) under one
-      |   or more contributor license agreements.  See the NOTICE file
-      |   distributed with this work for additional information
-      |   regarding copyright ownership.  The ASF licenses this file
-      |   to you under the Apache License, Version 2.0 (the
-      |   "License"); you may not use this file except in compliance
-      |   with the License.  You may obtain a copy of the License at
-      |
-      |       http://www.apache.org/licenses/LICENSE-2.0
-      |
-      |   Unless required by applicable law or agreed to in writing, software
-      |   distributed under the License is distributed on an "AS IS" BASIS,
-      |   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-      |   See the License for the specific language governing permissions and
-      |   limitations under the License.
-    """.stripMargin
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Sum.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Sum.scala b/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Sum.scala
deleted file mode 100644
index 6560066..0000000
--- a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Sum.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.wordcount
-
-import java.util.concurrent.TimeUnit
-import scala.collection.mutable
-import scala.concurrent.duration.FiniteDuration
-
-import akka.actor.Cancellable
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
-
-class Sum(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
-  private[wordcount] val map: mutable.HashMap[String, Long] = new mutable.HashMap[String, Long]()
-
-  private[wordcount] var wordCount: Long = 0
-  private var snapShotTime: Long = System.currentTimeMillis()
-  private var snapShotWordCount: Long = 0
-
-  private var scheduler: Cancellable = null
-
-  override def onStart(startTime: StartTime): Unit = {
-    scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS),
-      new FiniteDuration(30, TimeUnit.SECONDS))(reportWordCount)
-  }
-
-  override def onNext(msg: Message): Unit = {
-    if (null != msg) {
-      val current = map.getOrElse(msg.msg.asInstanceOf[String], 0L)
-      wordCount += 1
-      map.put(msg.msg.asInstanceOf[String], current + 1)
-    }
-  }
-
-  override def onStop(): Unit = {
-    if (scheduler != null) {
-      scheduler.cancel()
-    }
-  }
-
-  def reportWordCount(): Unit = {
-    val current: Long = System.currentTimeMillis()
-    LOG.info(s"Task ${taskContext.taskId} Throughput:" +
-      s" ${(wordCount - snapShotWordCount, (current - snapShotTime) / 1000)} (words, second)")
-    snapShotWordCount = wordCount
-    snapShotTime = current
-  }
-}
\ No newline at end of file