You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/08/15 15:04:32 UTC

[1/3] incubator-gearpump git commit: [GEARPUMP-188] use java.time.Instant for Task start time

Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 6d919ec97 -> 23daf0cf9


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Task.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Task.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Task.java
index 2efce45..89018a1 100644
--- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Task.java
+++ b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Task.java
@@ -21,9 +21,10 @@ package org.apache.gearpump.streaming.javaapi;
 import akka.actor.ActorRef;
 import org.apache.gearpump.Message;
 import org.apache.gearpump.cluster.UserConfig;
-import org.apache.gearpump.streaming.task.StartTime;
 import org.apache.gearpump.streaming.task.TaskContext;
 
+import java.time.Instant;
+
 /**
  * Java version of Task.
  *
@@ -45,7 +46,7 @@ public class Task extends org.apache.gearpump.streaming.task.Task {
   }
 
   @Override
-  public void onStart(StartTime startTime) {
+  public void onStart(Instant startTime) {
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
index 5027500..b6c087e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
@@ -18,6 +18,8 @@
 
 package org.apache.gearpump.streaming.dsl
 
+import java.time.Instant
+
 import akka.actor.ActorSystem
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.cluster.client.ClientContext
@@ -27,7 +29,7 @@ import org.apache.gearpump.streaming.dsl.plan.Planner
 import org.apache.gearpump.streaming.source.DataSource
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
 import org.apache.gearpump.util.Graph
-import org.apache.gearpump.{Message, TimeStamp}
+import org.apache.gearpump.Message
 
 import scala.language.implicitConversions
 
@@ -69,36 +71,36 @@ object StreamApp {
   }
 
   implicit def streamAppToApplication(streamApp: StreamApp): StreamApplication = {
-    streamApp.plan
+    streamApp.plan()
   }
 
   implicit class Source(app: StreamApp) extends java.io.Serializable {
 
-    def source[T](dataSource: DataSource, parallism: Int): Stream[T] = {
-      source(dataSource, parallism, UserConfig.empty)
+    def source[T](dataSource: DataSource, parallelism: Int): Stream[T] = {
+      source(dataSource, parallelism, UserConfig.empty)
     }
 
-    def source[T](dataSource: DataSource, parallism: Int, description: String): Stream[T] = {
-      source(dataSource, parallism, UserConfig.empty, description)
+    def source[T](dataSource: DataSource, parallelism: Int, description: String): Stream[T] = {
+      source(dataSource, parallelism, UserConfig.empty, description)
     }
 
-    def source[T](dataSource: DataSource, parallism: Int, conf: UserConfig): Stream[T] = {
-      source(dataSource, parallism, conf, description = null)
+    def source[T](dataSource: DataSource, parallelism: Int, conf: UserConfig): Stream[T] = {
+      source(dataSource, parallelism, conf, description = null)
     }
 
-    def source[T](dataSource: DataSource, parallism: Int, conf: UserConfig, description: String)
+    def source[T](dataSource: DataSource, parallelism: Int, conf: UserConfig, description: String)
       : Stream[T] = {
-      implicit val sourceOp = DataSourceOp(dataSource, parallism, conf, description)
+      implicit val sourceOp = DataSourceOp(dataSource, parallelism, conf, description)
       app.graph.addVertex(sourceOp)
       new Stream[T](app.graph, sourceOp)
     }
-    def source[T](seq: Seq[T], parallism: Int, description: String): Stream[T] = {
-      this.source(new CollectionDataSource[T](seq), parallism, UserConfig.empty, description)
+    def source[T](seq: Seq[T], parallelism: Int, description: String): Stream[T] = {
+      this.source(new CollectionDataSource[T](seq), parallelism, UserConfig.empty, description)
     }
 
-    def source[T](source: Class[_ <: Task], parallism: Int, conf: UserConfig, description: String)
+    def source[T](source: Class[_ <: Task], parallelism: Int, conf: UserConfig, description: String)
       : Stream[T] = {
-      val sourceOp = ProcessorOp(source, parallism, conf, Option(description).getOrElse("source"))
+      val sourceOp = ProcessorOp(source, parallelism, conf, Option(description).getOrElse("source"))
       app.graph.addVertex(sourceOp)
       new Stream[T](app.graph, sourceOp)
     }
@@ -119,5 +121,5 @@ class CollectionDataSource[T](seq: Seq[T]) extends DataSource {
 
   override def close(): Unit = {}
 
-  override def open(context: TaskContext, startTime: TimeStamp): Unit = {}
+  override def open(context: TaskContext, startTime: Instant): Unit = {}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
index 56d31db..6bd0da2 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
@@ -18,11 +18,11 @@
 
 package org.apache.gearpump.streaming.dsl.plan
 
-import scala.collection.TraversableOnce
+import java.time.Instant
 
+import scala.collection.TraversableOnce
 import akka.actor.ActorSystem
 import org.slf4j.Logger
-
 import org.apache.gearpump._
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.Constants._
@@ -32,7 +32,7 @@ import org.apache.gearpump.streaming.dsl.op._
 import org.apache.gearpump.streaming.dsl.plan.OpTranslator._
 import org.apache.gearpump.streaming.sink.DataSink
 import org.apache.gearpump.streaming.source.DataSource
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
 import org.apache.gearpump.util.LogUtil
 
 /**
@@ -116,7 +116,7 @@ object OpTranslator {
 
   class DummyInputFunction[T] extends SingleInputFunction[T, T] {
     override def andThen[OUTER](other: SingleInputFunction[T, OUTER])
-      : SingleInputFunction[T, OUTER] = {
+    : SingleInputFunction[T, OUTER] = {
       other
     }
 
@@ -131,13 +131,13 @@ object OpTranslator {
     extends SingleInputFunction[IN, OUT] {
 
     override def process(value: IN): TraversableOnce[OUT] = {
-      first.process(value).flatMap(second.process(_))
+      first.process(value).flatMap(second.process)
     }
 
     override def description: String = {
       Option(first.description).flatMap { description =>
         Option(second.description).map(description + "." + _)
-      }.getOrElse(null)
+      }.orNull
     }
   }
 
@@ -182,9 +182,6 @@ object OpTranslator {
 
     private var groups = Map.empty[GROUP, SingleInputFunction[IN, OUT]]
 
-    override def onStart(startTime: StartTime): Unit = {
-    }
-
     override def onNext(msg: Message): Unit = {
       val time = msg.timestamp
 
@@ -216,8 +213,8 @@ object OpTranslator {
         taskContext, userConf)
     }
 
-    override def onStart(startTime: StartTime): Unit = {
-      source.open(taskContext, startTime.startTime)
+    override def onStart(startTime: Instant): Unit = {
+      source.open(taskContext, startTime)
       self ! Message("start", System.currentTimeMillis())
     }
 
@@ -256,9 +253,6 @@ object OpTranslator {
         GEARPUMP_STREAMING_OPERATOR)(taskContext.system), taskContext, userConf)
     }
 
-    override def onStart(startTime: StartTime): Unit = {
-    }
-
     override def onNext(msg: Message): Unit = {
       val time = msg.timestamp
 
@@ -281,7 +275,7 @@ object OpTranslator {
         taskContext, userConf)
     }
 
-    override def onStart(startTime: StartTime): Unit = {
+    override def onStart(startTime: Instant): Unit = {
       dataSink.open(taskContext)
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala
index f8bc0ab..0db44f2 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala
@@ -18,9 +18,11 @@
 
 package org.apache.gearpump.streaming.sink
 
+import java.time.Instant
+
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
 object DataSinkTask {
   val DATA_SINK = "data_sink"
@@ -32,11 +34,12 @@ object DataSinkTask {
 class DataSinkTask private[sink](context: TaskContext, conf: UserConfig, sink: DataSink)
   extends Task(context, conf) {
 
+
   def this(context: TaskContext, conf: UserConfig) = {
     this(context, conf, conf.getValue[DataSink](DataSinkTask.DATA_SINK)(context.system).get)
   }
 
-  override def onStart(startTime: StartTime): Unit = {
+  override def onStart(startTime: Instant): Unit = {
     LOG.info("opening data sink...")
     sink.open(context)
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala
index 0fb6db4..f55d102 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala
@@ -18,11 +18,11 @@
 
 package org.apache.gearpump.streaming.source
 
+import java.time.Instant
+
 import org.apache.gearpump.streaming.task.TaskContext
 import org.apache.gearpump.Message
 
-import scala.util.Random
-
 /**
  * Interface to implement custom source where data is read into the system.
  * a DataSource could be a message queue like kafka or simply data generation source.
@@ -52,7 +52,7 @@ trait DataSource extends java.io.Serializable {
    * @param context is the task context at runtime
    * @param startTime is the start time of system
    */
-  def open(context: TaskContext, startTime: Long): Unit
+  def open(context: TaskContext, startTime: Instant): Unit
 
   /**
    * Reads next message from data source and

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
index f845628..468ae3b 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
@@ -18,9 +18,11 @@
 
 package org.apache.gearpump.streaming.source
 
+import java.time.Instant
+
 import org.apache.gearpump._
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
 object DataSourceTask {
   val DATA_SOURCE = "data_source"
@@ -45,10 +47,8 @@ class DataSourceTask private[source](context: TaskContext, conf: UserConfig, sou
     this(context, conf, conf.getValue[DataSource](DataSourceTask.DATA_SOURCE)(context.system).get)
   }
   private val batchSize = conf.getInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE).getOrElse(1000)
-  private var startTime = 0L
 
-  override def onStart(newStartTime: StartTime): Unit = {
-    startTime = newStartTime.startTime
+  override def onStart(startTime: Instant): Unit = {
     LOG.info(s"opening data source at $startTime")
     source.open(context, startTime)
     self ! Message("start", System.currentTimeMillis())

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
index c7b503e..aceff4a 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
@@ -18,12 +18,13 @@
 
 package org.apache.gearpump.streaming.state.api
 
+import java.time.Instant
 import java.util.concurrent.TimeUnit
 import scala.concurrent.duration.FiniteDuration
 
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.state.impl.{CheckpointManager, PersistentStateConfig}
-import org.apache.gearpump.streaming.task.{UpdateCheckpointClock, StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{UpdateCheckpointClock, Task, TaskContext}
 import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
 import org.apache.gearpump.util.LogUtil
 import org.apache.gearpump.{Message, TimeStamp}
@@ -70,8 +71,8 @@ abstract class PersistentTask[T](taskContext: TaskContext, conf: UserConfig)
   /** Persistent state that will be stored (by checkpointing) automatically to storage like HDFS */
   val state = persistentState
 
-  final override def onStart(startTime: StartTime): Unit = {
-    val timestamp = startTime.startTime
+  final override def onStart(startTime: Instant): Unit = {
+    val timestamp = startTime.toEpochMilli
     checkpointManager
       .recover(timestamp)
       .foreach(state.recover(timestamp, _))
@@ -101,6 +102,7 @@ abstract class PersistentTask[T](taskContext: TaskContext, conf: UserConfig)
     }
   }
 
+
   final override def onStop(): Unit = {
     checkpointManager.close()
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/task/StartTime.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/StartTime.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/StartTime.scala
deleted file mode 100644
index fb097d3..0000000
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/StartTime.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.task
-
-import org.apache.gearpump.TimeStamp
-
-/** Start time of streaming application. All message older than start time will be dropped */
-case class StartTime(startTime: TimeStamp = 0)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
index 9c76a40..c94dec4 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
@@ -18,6 +18,8 @@
 
 package org.apache.gearpump.streaming.task
 
+import java.time.Instant
+
 import scala.concurrent.duration.FiniteDuration
 
 import akka.actor.Actor.Receive
@@ -133,7 +135,7 @@ trait TaskInterface {
    *                  replay the data source, or from when a processor task should recover its
    *                  checkpoint data in to in-memory state.
    */
-  def onStart(startTime: StartTime): Unit
+  def onStart(startTime: Instant): Unit
 
   /**
    * Method called for each message received.
@@ -176,7 +178,7 @@ abstract class Task(taskContext: TaskContext, userConf: UserConfig) extends Task
    */
   protected def sender: ActorRef = taskContext.sender
 
-  def onStart(startTime: StartTime): Unit = {}
+  def onStart(startTime: Instant): Unit = {}
 
   def onNext(msg: Message): Unit = {}
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
index d12aac1..30a24fa 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
@@ -18,12 +18,12 @@
 
 package org.apache.gearpump.streaming.task
 
+import java.time.Instant
 import java.util
 import java.util.concurrent.TimeUnit
 
 import akka.actor._
 import org.slf4j.Logger
-
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.gs.collections.impl.map.mutable.primitive.IntShortHashMap
 import org.apache.gearpump.metrics.Metrics
@@ -101,7 +101,7 @@ class TaskActor(
 
   task.setTaskActor(this)
 
-  def onStart(startTime: StartTime): Unit = {
+  def onStart(startTime: Instant): Unit = {
     task.onStart(startTime)
   }
 
@@ -111,6 +111,7 @@ class TaskActor(
 
   def onStop(): Unit = task.onStop()
 
+
   /**
    * output to a downstream by specifying a arrayIndex
    * @param arrayIndex this is not same as ProcessorId
@@ -193,7 +194,7 @@ class TaskActor(
     // Put this as the last step so that the subscription is already initialized.
     // Message sending in current Task before onStart will not be delivered to
     // target
-    onStart(new StartTime(upstreamMinClock))
+    onStart(Instant.ofEpochMilli(upstreamMinClock))
 
     appMaster ! GetUpstreamMinClock(taskId)
     context.become(handleMessages(sender))

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
index cd33f7e..31c991e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
@@ -18,15 +18,15 @@
 
 package org.apache.gearpump.streaming.task
 
-import scala.concurrent.duration.FiniteDuration
+import java.time.Instant
 
+import scala.concurrent.duration.FiniteDuration
 import akka.actor.Actor._
 import akka.actor.{ActorRef, ActorSystem, Cancellable, Props}
 import org.slf4j.Logger
-
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.util.LogUtil
-import org.apache.gearpump.{Message, TimeStamp}
+import org.apache.gearpump.{TimeStamp, Message}
 
 /**
  * This provides TaskContext for user defined tasks
@@ -41,7 +41,7 @@ class TaskWrapper(
 
   private val LOG = LogUtil.getLogger(taskClass, task = taskId)
 
-  private var actor: TaskActor = null
+  private var actor: TaskActor = _
 
   private var task: Option[Task] = None
 
@@ -87,8 +87,8 @@ class TaskWrapper(
 
   override def actorOf(props: Props, name: String): ActorRef = actor.context.actorOf(props, name)
 
-  override def onStart(startTime: StartTime): Unit = {
-    if (None != task) {
+  override def onStart(startTime: Instant): Unit = {
+    if (task.isDefined) {
       LOG.error(s"Task.onStart should not be called multiple times... ${task.getClass}")
     }
     val constructor = taskClass.getConstructor(classOf[TaskContext], classOf[UserConfig])

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
index c9f1b89..647ad0a 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
@@ -38,7 +38,7 @@ import org.apache.gearpump.partitioner.HashPartitioner
 import org.apache.gearpump.streaming.AppMasterToExecutor.StopTask
 import org.apache.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, UnRegisterTask}
 import org.apache.gearpump.streaming.appmaster.AppMaster.{TaskActorRef, LookupTaskActorRef}
-import org.apache.gearpump.streaming.task.{StartTime, TaskContext, _}
+import org.apache.gearpump.streaming.task.{TaskContext, _}
 import org.apache.gearpump.streaming.{Constants, DAG, Processor, StreamApplication}
 import org.apache.gearpump.util.ActorSystemBooter.RegisterActorSystem
 import org.apache.gearpump.util.{ActorUtil, Graph}
@@ -300,17 +300,9 @@ object AppMasterSpec {
 }
 
 class TaskA(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
-
-  override def onStart(startTime: StartTime): Unit = {
-  }
-
   override def onNext(msg: Message): Unit = {}
 }
 
 class TaskB(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
-
-  override def onStart(startTime: StartTime): Unit = {
-  }
-
   override def onNext(msg: Message): Unit = {}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
index 0dd3e5b..bb495a7 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
@@ -36,7 +36,7 @@ import org.apache.gearpump.streaming.appmaster.JarScheduler.ResourceRequestDetai
 import org.apache.gearpump.streaming.appmaster.TaskManager.ApplicationReady
 import org.apache.gearpump.streaming.appmaster.TaskManagerSpec.{Env, Task1, Task2}
 import org.apache.gearpump.streaming.executor.Executor.RestartTasks
-import org.apache.gearpump.streaming.task.{StartTime, TaskContext, _}
+import org.apache.gearpump.streaming.task.{TaskContext, _}
 import org.apache.gearpump.streaming.{DAG, LifeTime, ProcessorDescription, ProcessorId}
 import org.apache.gearpump.transport.HostPort
 import org.apache.gearpump.util.Graph
@@ -270,13 +270,11 @@ object TaskManagerSpec {
 
   class Task1(taskContext: TaskContext, userConf: UserConfig)
     extends Task(taskContext, userConf) {
-    override def onStart(startTime: StartTime): Unit = {}
     override def onNext(msg: Message): Unit = {}
   }
 
   class Task2(taskContext: TaskContext, userConf: UserConfig)
     extends Task(taskContext, userConf) {
-    override def onStart(startTime: StartTime): Unit = {}
     override def onNext(msg: Message): Unit = {}
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
index 4a532dd..864aa93 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
@@ -18,14 +18,13 @@
 package org.apache.gearpump.streaming.appmaster
 
 import com.typesafe.config.ConfigFactory
-import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceRequest}
 import org.apache.gearpump.cluster.worker.WorkerId
 import org.apache.gearpump.cluster.{TestUtil, UserConfig}
 import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner}
 import org.apache.gearpump.streaming.appmaster.TaskLocator.Localities
 import org.apache.gearpump.streaming.appmaster.TaskSchedulerSpec.{TestTask1, TestTask2}
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
+import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskId}
 import org.apache.gearpump.streaming.{DAG, ProcessorDescription}
 import org.apache.gearpump.util.Graph
 import org.apache.gearpump.util.Graph._
@@ -108,8 +107,8 @@ class TaskSchedulerSpec extends WordSpec with Matchers {
 
       taskScheduler.setDAG(dag)
       val tasks = taskScheduler.schedule(WorkerId(1, 0L), executorId = 0, Resource(4))
-      assert(tasks.filter(_.processorId == 0).length == 2)
-      assert(tasks.filter(_.processorId == 1).length == 2)
+      assert(tasks.count(_.processorId == 0) == 2)
+      assert(tasks.count(_.processorId == 1) == 2)
     }
   }
 }
@@ -117,13 +116,9 @@ class TaskSchedulerSpec extends WordSpec with Matchers {
 object TaskSchedulerSpec {
   class TestTask1(taskContext: TaskContext, userConf: UserConfig)
     extends Task(taskContext, userConf) {
-    override def onStart(startTime: StartTime): Unit = Unit
-    override def onNext(msg: Message): Unit = Unit
   }
 
   class TestTask2(taskContext: TaskContext, userConf: UserConfig)
     extends Task(taskContext, userConf) {
-    override def onStart(startTime: StartTime): Unit = Unit
-    override def onNext(msg: Message): Unit = Unit
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
index 6bdd8aa..82979e0 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
@@ -26,7 +26,7 @@ import org.apache.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner}
 import org.apache.gearpump.streaming.dsl.StreamSpec.Join
 import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
 import org.apache.gearpump.streaming.dsl.plan.OpTranslator._
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
 import org.apache.gearpump.util.Graph
 import org.apache.gearpump.util.Graph._
 import org.mockito.Mockito.when
@@ -39,6 +39,7 @@ import scala.util.{Either, Left, Right}
 
 class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar {
 
+
   implicit var system: ActorSystem = null
 
   override def beforeAll(): Unit = {
@@ -108,7 +109,6 @@ object StreamSpec {
   class Join(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
 
     var query: String = null
-    override def onStart(startTime: StartTime): Unit = {}
 
     override def onNext(msg: Message): Unit = {
       msg.msg match {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
index ecc5352..144df0f 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
@@ -18,6 +18,8 @@
 
 package org.apache.gearpump.streaming.dsl.plan
 
+import java.time.Instant
+
 import scala.concurrent.Await
 import scala.concurrent.duration.Duration
 
@@ -33,10 +35,10 @@ import org.apache.gearpump.streaming.Constants._
 import org.apache.gearpump.streaming.MockUtil
 import org.apache.gearpump.streaming.dsl.CollectionDataSource
 import org.apache.gearpump.streaming.dsl.plan.OpTranslator._
-import org.apache.gearpump.streaming.task.StartTime
 
 class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
 
+
   "andThen" should "chain multiple single input function" in {
     val dummy = new DummyInputFunction[String]
     val split = new FlatMapFunction[String, String](line => line.split("\\s"), "split")
@@ -74,7 +76,7 @@ class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
     // Source with no transformer
     val source = new SourceTask[String, String](new CollectionDataSource[String](data), None,
       taskContext, conf)
-    source.onStart(StartTime(0))
+    source.onStart(Instant.EPOCH)
     source.onNext(Message("next"))
     verify(taskContext, times(1)).output(anyObject())
 
@@ -83,7 +85,7 @@ class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
     val double = new FlatMapFunction[String, String](word => List(word, word), "double")
     val another = new SourceTask(new CollectionDataSource[String](data), Some(double),
       anotherTaskContext, conf)
-    another.onStart(StartTime(0))
+    another.onStart(Instant.EPOCH)
     another.onNext(Message("next"))
     verify(anotherTaskContext, times(2)).output(anyObject())
   }
@@ -106,7 +108,7 @@ class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
     val taskContext = MockUtil.mockTaskContext
 
     val task = new GroupByTask[String, String, String](input => input, taskContext, config)
-    task.onStart(StartTime(0))
+    task.onStart(Instant.EPOCH)
 
     val peopleCaptor = ArgumentCaptor.forClass(classOf[Message])
 
@@ -130,7 +132,7 @@ class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
     val conf = UserConfig.empty
     val double = new FlatMapFunction[String, String](word => List(word, word), "double")
     val task = new TransformTask[String, String](Some(double), taskContext, conf)
-    task.onStart(StartTime(0))
+    task.onStart(Instant.EPOCH)
 
     val data = "1 2  2  3 3  3".split("\\s+")
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/test/scala/org/apache/gearpump/streaming/sink/DataSinkTaskSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/sink/DataSinkTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/sink/DataSinkTaskSpec.scala
index 7a2c2d1..55e59e0 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/sink/DataSinkTaskSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/sink/DataSinkTaskSpec.scala
@@ -18,10 +18,11 @@
 
 package org.apache.gearpump.streaming.sink
 
+import java.time.Instant
+
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
 import org.mockito.Mockito._
 import org.scalacheck.Gen
 import org.scalatest.mock.MockitoSugar
@@ -30,13 +31,14 @@ import org.scalatest.{PropSpec, Matchers}
 
 class DataSinkTaskSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
 
+
   property("DataSinkTask.onStart should call DataSink.open" ) {
-    forAll(Gen.chooseNum[Long](0L, 1000L)) { (startTime: Long) =>
+    forAll(Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) { (startTime: Instant) =>
       val taskContext = MockUtil.mockTaskContext
       val config = UserConfig.empty
       val dataSink = mock[DataSink]
       val sinkTask = new DataSinkTask(taskContext, config, dataSink)
-      sinkTask.onStart(StartTime(startTime))
+      sinkTask.onStart(startTime)
       verify(dataSink).open(taskContext)
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
index d4d580f..ae9bf37 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
@@ -18,10 +18,11 @@
 
 package org.apache.gearpump.streaming.source
 
+import java.time.Instant
+
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.{TaskContext, StartTime}
 import org.mockito.Mockito._
 import org.scalacheck.Gen
 import org.scalatest.mock.MockitoSugar
@@ -31,7 +32,7 @@ import org.scalatest.prop.PropertyChecks
 class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
 
   property("DataSourceTask.onStart should call DataSource.open") {
-    forAll(Gen.chooseNum[Long](0L, 1000L)) { (startTime: Long) =>
+    forAll(Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) { (startTime: Instant) =>
       val taskContext = MockUtil.mockTaskContext
       implicit val system = MockUtil.system
       val dataSource = mock[DataSource]
@@ -40,7 +41,7 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with
 
       val sourceTask = new DataSourceTask(taskContext, config, dataSource)
 
-      sourceTask.onStart(StartTime(startTime))
+      sourceTask.onStart(startTime)
       verify(dataSource).open(taskContext, startTime)
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
index 4afee8b..258a5ff 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
@@ -31,6 +31,7 @@ import org.apache.gearpump.streaming.task.SubscriptionSpec.NextTask
 import org.apache.gearpump.streaming.{LifeTime, ProcessorDescription}
 
 class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar {
+
   val appId = 0
   val executorId = 0
   val taskId = TaskId(0, 0)
@@ -132,11 +133,5 @@ class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar {
 object SubscriptionSpec {
 
   class NextTask(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
-
-    override def onStart(startTime: StartTime): Unit = {
-    }
-
-    override def onNext(msg: Message): Unit = {
-    }
   }
 }
\ No newline at end of file


[3/3] incubator-gearpump git commit: [GEARPUMP-188] use java.time.Instant for Task start time

Posted by ma...@apache.org.
[GEARPUMP-188] use java.time.Instant for Task start time

Author: manuzhang <ow...@gmail.com>

Closes #74 from manuzhang/replace_timestamp_with_instant.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/23daf0cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/23daf0cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/23daf0cf

Branch: refs/heads/master
Commit: 23daf0cf9c1db3fabc1b679993fcf1d6edb43e7d
Parents: 6d919ec
Author: manuzhang <ow...@gmail.com>
Authored: Mon Aug 15 23:04:11 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Mon Aug 15 23:04:11 2016 +0800

----------------------------------------------------------------------
 .../pagerank/PageRankController.scala           |   4 +-
 .../streaming/examples/complexdag/Node.scala    |   6 +-
 .../streaming/examples/complexdag/Sink.scala    |   6 +-
 .../streaming/examples/complexdag/Source.scala  |   6 +-
 .../examples/fsio/SeqFileStreamProcessor.scala  |   8 +-
 .../examples/fsio/SeqFileStreamProducer.scala   |   9 +-
 .../fsio/SeqFileStreamProcessorSpec.scala       |   5 +-
 .../fsio/SeqFileStreamProducerSpec.scala        |   5 +-
 .../examples/kafka/wordcount/Split.scala        |   7 +-
 .../examples/kafka/wordcount/Sum.scala          |   7 +-
 .../examples/kafka/wordcount/SumSpec.scala      |   5 +-
 .../examples/sol/SOLStreamProcessor.scala       |   5 +-
 .../examples/sol/SOLStreamProducer.scala        |   5 +-
 .../examples/sol/SOLStreamProcessorSpec.scala   |   5 +-
 .../examples/sol/SOLStreamProducerSpec.scala    |   5 +-
 .../processor/NumberGeneratorProcessor.scala    |   8 +-
 .../state/processor/CountProcessorSpec.scala    |   6 +-
 .../NumberGeneratorProcessorSpec.scala          |   5 +-
 .../processor/WindowAverageProcessorSpec.scala  |   6 +-
 examples/streaming/stockcrawler/README.md       |  19 --
 .../src/main/resources/geardefault.conf         |   9 -
 .../src/main/resources/stock/css/body.png       | Bin 201 -> 0 bytes
 .../src/main/resources/stock/css/custom.css     | 115 -----------
 .../src/main/resources/stock/css/foot.png       | Bin 2250 -> 0 bytes
 .../src/main/resources/stock/css/header.png     | Bin 17350 -> 0 bytes
 .../src/main/resources/stock/js/stock.js        | 157 ---------------
 .../src/main/resources/stock/stock.html         |  87 --------
 .../streaming/examples/stock/Analyzer.scala     | 170 ----------------
 .../streaming/examples/stock/Crawler.scala      |  60 ------
 .../streaming/examples/stock/Data.scala         |  61 ------
 .../streaming/examples/stock/QueryServer.scala  | 134 -------------
 .../streaming/examples/stock/StockMarket.scala  | 155 ---------------
 .../streaming/examples/stock/main/Stock.scala   |  86 --------
 examples/streaming/transport/README.md          |   3 -
 .../src/main/resources/geardefault.conf         |  12 --
 .../src/main/resources/transport/css/body.png   | Bin 201 -> 0 bytes
 .../src/main/resources/transport/css/custom.css | 115 -----------
 .../src/main/resources/transport/css/foot.png   | Bin 2250 -> 0 bytes
 .../src/main/resources/transport/css/header.png | Bin 17350 -> 0 bytes
 .../main/resources/transport/js/transport.js    | 180 -----------------
 .../main/resources/transport/svg/beijing.svg    | 199 -------------------
 .../src/main/resources/transport/transport.html |  88 --------
 .../streaming/examples/transport/Data.scala     |  32 ---
 .../examples/transport/DataSource.scala         |  56 ------
 .../examples/transport/QueryServer.scala        | 154 --------------
 .../examples/transport/Transport.scala          |  69 -------
 .../examples/transport/VelocityInspector.scala  | 123 ------------
 .../examples/transport/generator/MockCity.scala |  88 --------
 .../generator/PassRecordGenerator.scala         |  69 -------
 .../examples/transport/DataSourceSpec.scala     |  45 -----
 .../examples/transport/TransportSpec.scala      |  69 -------
 .../transport/generator/MockCitySpec.scala      |  31 ---
 .../generator/PassRecordGeneratorSpec.scala     |  34 ----
 .../streaming/examples/wordcountjava/Split.java |   5 +-
 .../streaming/examples/wordcountjava/Sum.java   |   4 +-
 .../streaming/examples/wordcount/Split.scala    |   5 +-
 .../streaming/examples/wordcount/Sum.scala      |   5 +-
 .../examples/wordcount/SplitSpec.scala          |   7 +-
 .../streaming/examples/wordcount/SumSpec.scala  |   5 +-
 .../storm/processor/StormProcessor.scala        |   3 +-
 .../storm/producer/StormProducer.scala          |   3 +-
 .../storm/topology/GearpumpStormComponent.scala |   9 +-
 .../storm/processor/StormProcessorSpec.scala    |   5 +-
 .../storm/producer/StormProducerSpec.scala      |   5 +-
 .../topology/GearpumpStormComponentSpec.scala   |   9 +-
 .../topology/GearpumpStormTopologySpec.scala    |   1 -
 .../kafka/lib/source/AbstractKafkaSource.scala  |   5 +-
 .../streaming/kafka/KafkaSourceSpec.scala       |  10 +-
 .../apache/gearpump/streaming/javaapi/Task.java |   5 +-
 .../gearpump/streaming/dsl/StreamApp.scala      |  32 +--
 .../streaming/dsl/plan/OpTranslator.scala       |  24 +--
 .../gearpump/streaming/sink/DataSinkTask.scala  |   7 +-
 .../gearpump/streaming/source/DataSource.scala  |   6 +-
 .../streaming/source/DataSourceTask.scala       |   8 +-
 .../streaming/state/api/PersistentTask.scala    |   8 +-
 .../gearpump/streaming/task/StartTime.scala     |  24 ---
 .../apache/gearpump/streaming/task/Task.scala   |   6 +-
 .../gearpump/streaming/task/TaskActor.scala     |   7 +-
 .../gearpump/streaming/task/TaskWrapper.scala   |  12 +-
 .../streaming/appmaster/AppMasterSpec.scala     |  10 +-
 .../streaming/appmaster/TaskManagerSpec.scala   |   4 +-
 .../streaming/appmaster/TaskSchedulerSpec.scala |  11 +-
 .../gearpump/streaming/dsl/StreamSpec.scala     |   4 +-
 .../streaming/dsl/plan/OpTranslatorSpec.scala   |  12 +-
 .../streaming/sink/DataSinkTaskSpec.scala       |   8 +-
 .../streaming/source/DataSourceTaskSpec.scala   |   7 +-
 .../streaming/task/SubscriptionSpec.scala       |   7 +-
 87 files changed, 199 insertions(+), 2617 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankController.scala
----------------------------------------------------------------------
diff --git a/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankController.scala b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankController.scala
index d461876..aa250da 100644
--- a/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankController.scala
+++ b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankController.scala
@@ -17,6 +17,8 @@
  */
 package org.apache.gearpump.experiments.pagerank
 
+import java.time.Instant
+
 import akka.actor.Actor.Receive
 
 import org.apache.gearpump.cluster.UserConfig
@@ -39,7 +41,7 @@ class PageRankController(taskContext: TaskContext, conf: UserConfig)
   var weights = Map.empty[TaskId, Double]
   var deltas = Map.empty[TaskId, Double]
 
-  override def onStart(startTime: StartTime): Unit = {
+  override def onStart(startTime: Instant): Unit = {
     output(Tick(tick), tasks: _*)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala
index 8d163f9..ddd4d1a 100644
--- a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala
+++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala
@@ -18,14 +18,16 @@
 
 package org.apache.gearpump.streaming.examples.complexdag
 
+import java.time.Instant
+
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
 class Node(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
   import taskContext.output
 
-  override def onStart(startTime: StartTime): Unit = {}
+  override def onStart(startTime: Instant): Unit = {}
 
   override def onNext(msg: Message): Unit = {
     val list = msg.msg.asInstanceOf[Vector[String]]

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala
index 8dfa565..e9b00a0 100644
--- a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala
+++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala
@@ -18,9 +18,11 @@
 
 package org.apache.gearpump.streaming.examples.complexdag
 
+import java.time.Instant
+
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
 import scala.collection.mutable
 
@@ -28,7 +30,7 @@ class Sink(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext,
 
   var list = mutable.MutableList[String]()
 
-  override def onStart(startTime: StartTime): Unit = {
+  override def onStart(startTime: Instant): Unit = {
     list += getClass.getCanonicalName
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala
index 0359519..7abb3fc 100644
--- a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala
+++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala
@@ -18,14 +18,16 @@
 
 package org.apache.gearpump.streaming.examples.complexdag
 
+import java.time.Instant
+
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
 class Source(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
   import taskContext.output
 
-  override def onStart(startTime: StartTime): Unit = {
+  override def onStart(startTime: Instant): Unit = {
     self ! Message("start")
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
index 2e4a556..561346e 100644
--- a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
+++ b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
@@ -18,19 +18,19 @@
 package org.apache.gearpump.streaming.examples.fsio
 
 import java.io.File
+import java.time.Instant
 import java.util.concurrent.TimeUnit
-import scala.concurrent.duration.FiniteDuration
 
+import scala.concurrent.duration.FiniteDuration
 import akka.actor.Cancellable
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.io.SequenceFile._
 import org.apache.hadoop.io.{SequenceFile, Text}
-
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.examples.fsio.HadoopConfig._
 import org.apache.gearpump.streaming.examples.fsio.SeqFileStreamProcessor._
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
 class SeqFileStreamProcessor(taskContext: TaskContext, config: UserConfig)
   extends Task(taskContext, config) {
@@ -49,7 +49,7 @@ class SeqFileStreamProcessor(taskContext: TaskContext, config: UserConfig)
   private var snapShotTime: Long = 0
   private var scheduler: Cancellable = null
 
-  override def onStart(startTime: StartTime): Unit = {
+  override def onStart(startTime: Instant): Unit = {
 
     val fs = FileSystem.get(hadoopConf)
     fs.deleteOnExit(outputPath)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
index 02d2434..4106a2c 100644
--- a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
+++ b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
@@ -17,15 +17,16 @@
  */
 package org.apache.gearpump.streaming.examples.fsio
 
+import java.time.Instant
+
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.io.SequenceFile._
 import org.apache.hadoop.io.{SequenceFile, Text}
-
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.examples.fsio.HadoopConfig._
 import org.apache.gearpump.streaming.examples.fsio.SeqFileStreamProducer._
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
 class SeqFileStreamProducer(taskContext: TaskContext, config: UserConfig)
   extends Task(taskContext, config) {
@@ -34,12 +35,12 @@ class SeqFileStreamProducer(taskContext: TaskContext, config: UserConfig)
 
   val value = new Text()
   val key = new Text()
-  var reader: SequenceFile.Reader = null
+  var reader: SequenceFile.Reader = _
   val hadoopConf = config.hadoopConf
   val fs = FileSystem.get(hadoopConf)
   val inputPath = new Path(config.getString(INPUT_PATH).get)
 
-  override def onStart(startTime: StartTime): Unit = {
+  override def onStart(startTime: Instant): Unit = {
     reader = new SequenceFile.Reader(hadoopConf, Reader.file(inputPath))
     self ! Start
     LOG.info("sequence file spout initiated")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala
index 7831b14..2edb87f 100644
--- a/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala
+++ b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala
@@ -18,6 +18,7 @@
 package org.apache.gearpump.streaming.examples.fsio
 
 import java.io.File
+import java.time.Instant
 import scala.collection.mutable.ArrayBuffer
 
 import akka.actor.ActorSystem
@@ -33,7 +34,7 @@ import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
 
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.{TestUtil, UserConfig}
-import org.apache.gearpump.streaming.task.{StartTime, TaskId}
+import org.apache.gearpump.streaming.task.TaskId
 import org.apache.gearpump.streaming.{MockUtil, Processor}
 class SeqFileStreamProcessorSpec
   extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter {
@@ -67,7 +68,7 @@ class SeqFileStreamProcessorSpec
     when(context.taskId).thenReturn(taskId)
 
     val processor = new SeqFileStreamProcessor(context, conf)
-    processor.onStart(StartTime(0))
+    processor.onStart(Instant.EPOCH)
 
     forAll(kvGenerator) { kv =>
       val (key, value) = kv

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
index ad27e63..a03e68d 100644
--- a/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
+++ b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
@@ -17,6 +17,8 @@
  */
 package org.apache.gearpump.streaming.examples.fsio
 
+import java.time.Instant
+
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.hadoop.conf.Configuration
@@ -32,7 +34,6 @@ import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
 import org.apache.gearpump.streaming.MockUtil._
-import org.apache.gearpump.streaming.task.StartTime
 
 class SeqFileStreamProducerSpec
   extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter {
@@ -73,7 +74,7 @@ class SeqFileStreamProducerSpec
     val context = MockUtil.mockTaskContext
 
     val producer = new SeqFileStreamProducer(context, conf)
-    producer.onStart(StartTime(0))
+    producer.onStart(Instant.EPOCH)
     producer.onNext(Message("start"))
 
     val expected = kvPairs.map(kv => kv._1 + "++" + kv._2).toSet

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala
index a95f596..b78e788 100644
--- a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala
+++ b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala
@@ -18,16 +18,17 @@
 
 package org.apache.gearpump.streaming.examples.kafka.wordcount
 
-import com.twitter.bijection.Injection
+import java.time.Instant
 
+import com.twitter.bijection.Injection
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
 class Split(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
   import taskContext.output
 
-  override def onStart(startTime: StartTime): Unit = {
+  override def onStart(startTime: Instant): Unit = {
   }
 
   override def onNext(msg: Message): Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala
index 9930b92..58bb884 100644
--- a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala
+++ b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala
@@ -18,18 +18,19 @@
 
 package org.apache.gearpump.streaming.examples.kafka.wordcount
 
-import com.twitter.bijection.Injection
+import java.time.Instant
 
+import com.twitter.bijection.Injection
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
 class Sum(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
   import taskContext.output
 
   private[wordcount] var wordcount = Map.empty[String, Long]
 
-  override def onStart(startTime: StartTime): Unit = {}
+  override def onStart(startTime: Instant): Unit = {}
 
   override def onNext(message: Message): Unit = {
     val word = message.msg.asInstanceOf[String]

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala b/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala
index 3538ece..e37118a 100644
--- a/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala
+++ b/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala
@@ -17,6 +17,8 @@
  */
 package org.apache.gearpump.streaming.examples.kafka.wordcount
 
+import java.time.Instant
+
 import scala.collection.mutable
 
 import org.mockito.Matchers._
@@ -27,7 +29,6 @@ import org.scalatest.{FlatSpec, Matchers}
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
 
 class SumSpec extends FlatSpec with Matchers {
 
@@ -39,7 +40,7 @@ class SumSpec extends FlatSpec with Matchers {
     val taskContext = MockUtil.mockTaskContext
 
     val sum = new Sum(taskContext, UserConfig.empty)
-    sum.onStart(StartTime(0))
+    sum.onStart(Instant.EPOCH)
     val str = "once two two three three three"
 
     var totalWordCount = 0

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
index 796b0d2..a16cf4c 100644
--- a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
+++ b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
@@ -18,6 +18,7 @@
 
 package org.apache.gearpump.streaming.examples.sol
 
+import java.time.Instant
 import java.util.concurrent.TimeUnit
 import scala.concurrent.duration.FiniteDuration
 
@@ -25,7 +26,7 @@ import akka.actor.Cancellable
 
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
 class SOLStreamProcessor(taskContext: TaskContext, conf: UserConfig)
   extends Task(taskContext, conf) {
@@ -38,7 +39,7 @@ class SOLStreamProcessor(taskContext: TaskContext, conf: UserConfig)
   private var snapShotWordCount: Long = 0
   private var snapShotTime: Long = 0
 
-  override def onStart(startTime: StartTime): Unit = {
+  override def onStart(startTime: Instant): Unit = {
     scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS),
       new FiniteDuration(5, TimeUnit.SECONDS))(reportWordCount())
     snapShotTime = System.currentTimeMillis()

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala
index 84ed038..c1b11e5 100644
--- a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala
+++ b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala
@@ -18,12 +18,13 @@
 
 package org.apache.gearpump.streaming.examples.sol
 
+import java.time.Instant
 import java.util.Random
 
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.examples.sol.SOLStreamProducer._
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
 class SOLStreamProducer(taskContext: TaskContext, conf: UserConfig)
   extends Task(taskContext, conf) {
@@ -36,7 +37,7 @@ class SOLStreamProducer(taskContext: TaskContext, conf: UserConfig)
   private var rand: Random = null
   private var messageCount: Long = 0
 
-  override def onStart(startTime: StartTime): Unit = {
+  override def onStart(startTime: Instant): Unit = {
     prepareRandomMessage
     self ! Start
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala b/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala
index a6cc966..e3344bf 100644
--- a/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala
+++ b/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala
@@ -17,6 +17,8 @@
  */
 package org.apache.gearpump.streaming.examples.sol
 
+import java.time.Instant
+
 import org.mockito.Mockito._
 import org.scalacheck.Gen
 import org.scalatest.{FlatSpec, Matchers}
@@ -24,7 +26,6 @@ import org.scalatest.{FlatSpec, Matchers}
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
 
 class SOLStreamProcessorSpec extends FlatSpec with Matchers {
 
@@ -33,7 +34,7 @@ class SOLStreamProcessorSpec extends FlatSpec with Matchers {
     val context = MockUtil.mockTaskContext
 
     val sol = new SOLStreamProcessor(context, UserConfig.empty)
-    sol.onStart(StartTime(0))
+    sol.onStart(Instant.EPOCH)
     val msg = Message("msg")
     sol.onNext(msg)
     verify(context, times(1)).output(msg)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala b/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala
index 2316de8..dc21171 100644
--- a/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala
+++ b/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala
@@ -17,6 +17,8 @@
  */
 package org.apache.gearpump.streaming.examples.sol
 
+import java.time.Instant
+
 import org.mockito.Matchers._
 import org.mockito.Mockito._
 import org.scalatest.{Matchers, WordSpec}
@@ -24,7 +26,6 @@ import org.scalatest.{Matchers, WordSpec}
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
 
 class SOLStreamProducerSpec extends WordSpec with Matchers {
 
@@ -35,7 +36,7 @@ class SOLStreamProducerSpec extends WordSpec with Matchers {
       val context = MockUtil.mockTaskContext
 
       val producer = new SOLStreamProducer(context, conf)
-      producer.onStart(StartTime(0))
+      producer.onStart(Instant.EPOCH)
       producer.onNext(Message("msg"))
       verify(context).output(any[Message])
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
index 0e85f32..134afba 100644
--- a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
+++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
@@ -18,17 +18,19 @@
 
 package org.apache.gearpump.streaming.examples.state.processor
 
+import java.time.Instant
+
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
 class NumberGeneratorProcessor(taskContext: TaskContext, conf: UserConfig)
   extends Task(taskContext, conf) {
   import taskContext.output
 
   private var num = 0L
-  override def onStart(startTime: StartTime): Unit = {
-    num = startTime.startTime
+  override def onStart(startTime: Instant): Unit = {
+    num = startTime.toEpochMilli
     self ! Message("start")
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
index 6048034..b95d164 100644
--- a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
+++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
@@ -18,6 +18,8 @@
 
 package org.apache.gearpump.streaming.examples.state.processor
 
+import java.time.Instant
+
 import scala.concurrent.Await
 import scala.concurrent.duration._
 
@@ -33,7 +35,7 @@ import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
 import org.apache.gearpump.streaming.state.api.PersistentTask
 import org.apache.gearpump.streaming.state.impl.{InMemoryCheckpointStoreFactory, PersistentStateConfig}
-import org.apache.gearpump.streaming.task.{UpdateCheckpointClock, StartTime}
+import org.apache.gearpump.streaming.task.UpdateCheckpointClock
 import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
 
 class CountProcessorSpec extends PropSpec with PropertyChecks with Matchers {
@@ -59,7 +61,7 @@ class CountProcessorSpec extends PropSpec with PropertyChecks with Matchers {
         val appMaster = TestProbe()(system)
         when(taskContext.appMaster).thenReturn(appMaster.ref)
 
-        count.onStart(StartTime(0L))
+        count.onStart(Instant.EPOCH)
         appMaster.expectMsg(UpdateCheckpointClock(taskContext.taskId, 0L))
 
         for (i <- 0L to num) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala
index 2268994..d3f645c 100644
--- a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala
+++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala
@@ -18,6 +18,8 @@
 
 package org.apache.gearpump.streaming.examples.state.processor
 
+import java.time.Instant
+
 import scala.concurrent.Await
 import scala.concurrent.duration.Duration
 
@@ -30,7 +32,6 @@ import org.scalatest.{Matchers, WordSpec}
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
 
 class NumberGeneratorProcessorSpec extends WordSpec with Matchers {
   "NumberGeneratorProcessor" should {
@@ -47,7 +48,7 @@ class NumberGeneratorProcessorSpec extends WordSpec with Matchers {
 
       val conf = UserConfig.empty
       val genNum = new NumberGeneratorProcessor(taskContext, conf)
-      genNum.onStart(StartTime(0))
+      genNum.onStart(Instant.EPOCH)
       mockTaskActor.expectMsgType[Message]
 
       genNum.onNext(Message("next"))

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
index 0963429..255f869 100644
--- a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
+++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
@@ -18,6 +18,8 @@
 
 package org.apache.gearpump.streaming.examples.state.processor
 
+import java.time.Instant
+
 import scala.concurrent.Await
 import scala.concurrent.duration._
 
@@ -34,7 +36,7 @@ import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
 import org.apache.gearpump.streaming.state.api.PersistentTask
 import org.apache.gearpump.streaming.state.impl.{InMemoryCheckpointStoreFactory, PersistentStateConfig, WindowConfig}
-import org.apache.gearpump.streaming.task.{UpdateCheckpointClock, StartTime}
+import org.apache.gearpump.streaming.task.UpdateCheckpointClock
 import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
 
 class WindowAverageProcessorSpec extends PropSpec with PropertyChecks with Matchers {
@@ -61,7 +63,7 @@ class WindowAverageProcessorSpec extends PropSpec with PropertyChecks with Match
         val appMaster = TestProbe()(system)
         when(taskContext.appMaster).thenReturn(appMaster.ref)
 
-        windowAverage.onStart(StartTime(0L))
+        windowAverage.onStart(Instant.EPOCH)
         appMaster.expectMsg(UpdateCheckpointClock(taskContext.taskId, 0L))
 
         for (i <- 0L until num) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/README.md
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/README.md b/examples/streaming/stockcrawler/README.md
deleted file mode 100644
index b51590f..0000000
--- a/examples/streaming/stockcrawler/README.md
+++ /dev/null
@@ -1,19 +0,0 @@
-How to use
-===================
-1. Start local cluster, 
-  ```
-  bin/local
-  ```
-2. Submit the stock crawler
-  ```
-  bin\gear app -jar examples\gearpump-examples-assembly-0.3.2-SNAPSHOT.jar org.apache.gearpump.streaming.examples.stock.main.Stock
-  ```
-  
-  If you are behind  a proxy, you need to set the proxy address
-  ```
-  bin\gear app -jar examples\gearpump-examples-assembly-0.3.2-SNAPSHOT.jar org.apache.gearpump.streaming.examples.stock.main.Stock -proxy host:port
-  ```
-  
-3. Check the UI
-  http://127.0.0.1:8080  
-  
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/resources/geardefault.conf
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/resources/geardefault.conf b/examples/streaming/stockcrawler/src/main/resources/geardefault.conf
deleted file mode 100644
index acee3bd..0000000
--- a/examples/streaming/stockcrawler/src/main/resources/geardefault.conf
+++ /dev/null
@@ -1,9 +0,0 @@
-gearpump {
-  serializers {
-    "org.apache.gearpump.streaming.examples.stock.StockPrice" = ""
-  }
-}
-
-spray.can {
-  server.parsing.max-content-length = "10M"
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/resources/stock/css/body.png
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/resources/stock/css/body.png b/examples/streaming/stockcrawler/src/main/resources/stock/css/body.png
deleted file mode 100644
index b5c536c..0000000
Binary files a/examples/streaming/stockcrawler/src/main/resources/stock/css/body.png and /dev/null differ

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/resources/stock/css/custom.css
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/resources/stock/css/custom.css b/examples/streaming/stockcrawler/src/main/resources/stock/css/custom.css
deleted file mode 100644
index 182d722..0000000
--- a/examples/streaming/stockcrawler/src/main/resources/stock/css/custom.css
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-.ui-datepicker {
-  font-size: 11px;
-}
-
-.sidebar-label {
-  font-size: 15px;
-  font-family: calibri, Arial, Helvetica, sans-serif;
-}
-
-.help {
-  font-size: 12px;
-  font-family: calibri, Arial, Helvetica, sans-serif;
-}
-
-div.splitter {
-  margin: 12px 0px 7px 0px;
-  clear: both;
-  border-top: 1px solid #EBEBEB;
-}
-
-input.sidebar {
-  width: 165px
-}
-
-select.sidebar {
-  width: 198px
-}
-
-table.dataintable {
-  font-family: calibri, Arial, Helvetica, sans-serif;
-  font-size: 15px;
-  margin-top: 10px;
-  border-collapse: collapse;
-  border: 1px solid #888;
-}
-
-table.dataintable th {
-  vertical-align: baseline;
-  padding: 5px 15px 5px 5px;
-  background-color: #EEE;
-  border: 1px solid #888;
-  text-align: left;
-}
-
-table.dataintable td {
-  vertical-align: text-top;
-  padding: 5px 15px 5px 5px;
-  background-color: #FFFFFF;
-  border: 1px solid #AAA;
-}
-
-#search {
-  width: 100px;
-  height: 25px;
-  position: relative;
-  left: 0px;
-  top: 5px;
-}
-
-#mytable {
-  width: 100%;
-  height: 300;
-  float: left;
-}
-
-#mychart {
-  height: 250px;
-  width: 100%;
-}
-
-#Menu {
-  height: 100%;
-  width: 245px;
-  float: left;
-}
-
-#header {
-  height: 115px;
-  background-image: url(header.png);
-}
-
-#body {
-  height: 100%;
-  width: 100%;
-  background-image: url(body.png);
-  background-size: 100% 100%;
-}
-
-#footer {
-  color: white;
-  height: 70px;
-  line-height: 70px;
-  text-align: middle;
-  clear: both;
-  text-align: center;
-  background-image: url(foot.png);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/resources/stock/css/foot.png
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/resources/stock/css/foot.png b/examples/streaming/stockcrawler/src/main/resources/stock/css/foot.png
deleted file mode 100644
index 5db91b5..0000000
Binary files a/examples/streaming/stockcrawler/src/main/resources/stock/css/foot.png and /dev/null differ

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/resources/stock/css/header.png
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/resources/stock/css/header.png b/examples/streaming/stockcrawler/src/main/resources/stock/css/header.png
deleted file mode 100644
index 9284e44..0000000
Binary files a/examples/streaming/stockcrawler/src/main/resources/stock/css/header.png and /dev/null differ

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/resources/stock/js/stock.js
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/resources/stock/js/stock.js b/examples/streaming/stockcrawler/src/main/resources/stock/js/stock.js
deleted file mode 100644
index 97f1e07..0000000
--- a/examples/streaming/stockcrawler/src/main/resources/stock/js/stock.js
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-function initChart(chartid, tableid, stockId) {
-  require.config({
-    paths: {
-      echarts: 'http://echarts.baidu.com/build/dist'
-    }
-  });
-
-  require(
-    [
-      'echarts',
-      'echarts/chart/line'
-    ],
-
-    function (ec) {
-      // \u57fa\u4e8e\u51c6\u5907\u597d\u7684dom\uff0c\u521d\u59cb\u5316echarts\u56fe\u8868
-      var myChart = ec.init(document.getElementById(chartid));
-      var dataPoints = 100;
-      var timeTicket;
-      clearInterval(timeTicket);
-      timeTicket = setInterval(function () {
-        $.getJSON("report/" + stockId, function (json) {
-          STOCK_NAME = json.name
-
-          var maxDrawnDown = json.currentMax[0].max.price - json.currentMax[0].min.price;
-          var time = new Date(json.currentMax[0].current.timestamp).toLocaleTimeString().replace(/^\D*/, '');
-          // \u52a8\u6001\u6570\u636e\u63a5\u53e3 addData
-          myChart.addData([
-            [
-              0,        // \u7cfb\u5217\u7d22\u5f15
-              maxDrawnDown.toFixed(2), // \u65b0\u589e\u6570\u636e
-              false,     // \u65b0\u589e\u6570\u636e\u662f\u5426\u4ece\u961f\u5217\u5934\u90e8\u63d2\u5165
-              false,     // \u662f\u5426\u589e\u52a0\u961f\u5217\u957f\u5ea6\uff0cfalse\u5219\u81ea\u5b9a\u5220\u9664\u539f\u6709\u6570\u636e\uff0c\u961f\u5934\u63d2\u5165\u5220\u961f\u5c3e\uff0c\u961f\u5c3e\u63d2\u5165\u5220\u961f\u5934
-              time
-            ],
-            [
-              1,        // \u7cfb\u5217\u7d22\u5f15
-              json.currentMax[0].current.price.toFixed(2), // \u65b0\u589e\u6570\u636e
-              false,     // \u65b0\u589e\u6570\u636e\u662f\u5426\u4ece\u961f\u5217\u5934\u90e8\u63d2\u5165
-              false,     // \u662f\u5426\u589e\u52a0\u961f\u5217\u957f\u5ea6\uff0cfalse\u5219\u81ea\u5b9a\u5220\u9664\u539f\u6709\u6570\u636e\uff0c\u961f\u5934\u63d2\u5165\u5220\u961f\u5c3e\uff0c\u961f\u5c3e\u63d2\u5165\u5220\u961f\u5934
-              time
-            ]
-          ]);
-          document.getElementById(chartid).style.display = "block"
-          document.getElementById(tableid).innerHTML = "<pre>" + JSON.stringify(json, null, 2) + "</pre>"
-        });
-      }, 2000);
-
-      var subtext_ = "Draw Down"
-
-      var option = {
-        title: {
-          text: 'Stock Analysis',
-          subtext: "Max " + subtext_
-        },
-        tooltip: {
-          trigger: 'axis'
-        },
-        legend: {
-          data: ["Current Price", "Current Draw Down"]
-        },
-        toolbox: {
-          show: false,
-          feature: {
-            mark: {show: true},
-            dataView: {show: true, readOnly: false},
-            magicType: {show: true, type: ['line', 'bar']},
-            restore: {show: true},
-            saveAsImage: {show: true}
-          }
-        },
-        dataZoom: {
-          show: false,
-          start: 0,
-          end: 100
-        },
-        xAxis: [
-          {
-            type: 'category',
-            boundaryGap: true,
-            data: (function () {
-              var now = new Date();
-              var res = [];
-              var len = dataPoints;
-              while (len--) {
-                res.unshift(now.toLocaleTimeString().replace(/^\D*/, ''));
-                now = new Date(now - 2000);
-              }
-              return res;
-            })()
-          }
-        ],
-        yAxis: [
-          {
-            type: 'value',
-            scale: true,
-            name: subtext_ + ' \u4ef7\u683c/\u5143',
-            boundaryGap: [0, 0.3]
-          },
-          {
-            type: 'value',
-            scale: true,
-            name: 'Current \u4ef7\u683c/\u5143',
-            boundaryGap: [0, 0.1]
-          }
-        ],
-        series: [
-          {
-            name: "Current Draw Down",
-            type: 'line',
-            data: (function () {
-              var res = [];
-              var len = dataPoints;
-              while (len--) {
-                res.push(0);
-              }
-              return res;
-            })()
-          },
-          {
-            name: "Current Price",
-            type: 'line',
-            yAxisIndex: 1,
-            data: (function () {
-              var res = [];
-              var len = dataPoints;
-              while (len--) {
-                res.push(0);
-              }
-              return res;
-            })()
-          }
-        ]
-      };
-
-      // \u4e3aecharts\u5bf9\u8c61\u52a0\u8f7d\u6570\u636e
-      myChart.setOption(option);
-    }
-  );
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/resources/stock/stock.html
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/resources/stock/stock.html b/examples/streaming/stockcrawler/src/main/resources/stock/stock.html
deleted file mode 100644
index 9682a53..0000000
--- a/examples/streaming/stockcrawler/src/main/resources/stock/stock.html
+++ /dev/null
@@ -1,87 +0,0 @@
-<!DOCTYPE html>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~      http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<html>
-
-<head>
-  <meta charset="utf-8">
-  <link rel=stylesheet type=text/css href="css/custom.css">
-  <script src="http://echarts.baidu.com/build/dist/echarts.js"></script>
-  <script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js"></script>
-  <script src="js/stock.js"></script>
-  <script type="text/javascript">
-    function search_onclick() {
-      var stockId = document.getElementById('stockId').value
-      initChart("mychart", "mytable", stockId)
-    }
-  </script>
-</head>
-
-<body style="background-color:#F2F2F2">
-<div id="container" style="width:882px; height:450px;margin-left:auto;margin-right:auto;">
-  <div style="height:0px"></div>
-  <div id="header">
-    <div
-      style="font-weight:600;position:relative;left:50px;top:50px;font-family: calibri, Arial, Helvetica, sans-serif;font-size:29px;color:white">
-      Big Data Stock Analysis Demo
-    </div>
-  </div>
-  <div id="body">
-    <div id="Menu">
-      <div style="position:relative;margin-left:30px; margin-right:20px;margin-top:20px;">
-        <!-- form to post to accompany to get accompanying cars -->
-
-        <table style="width:100%">
-          <tr>
-            <td class="sidebar-label">Stock Id:</td>
-          </tr>
-          <tr>
-            <td class="sidebar-label">Example: sh600019, sz000002</td>
-          </tr>
-          <tr>
-            <td style="vertical-align:top;">
-              <input id="stockId" class="sidebar" type="text" name="stockId"/>
-            </td>
-          </tr>
-        </table>
-        <div class="splitter"></div>
-        <div>
-          <button id="search" onclick="search_onclick()">Search</button>
-        </div>
-      </div>
-    </div>
-    <div id="content"
-         style="height:100%;width:585px;float:left;position:relative;left:20px;overflow:scroll;">
-      <div
-        style="height:50px;position:relative;top:15px;vertical-align:middle;font-weight:300;font-family: calibri, Arial, Helvetica, sans-serif;font-size:22px;color:black">
-        Analysis Result:
-      </div>
-      <div style="height:7px;background-color:#92BDF2;"></div>
-
-      <div id="mychart"></div>
-
-      <div id="mytable"></div>
-    </div>
-  </div>
-  <div id="footer">
-    Big Data Team @ Intel
-  </div>
-</div>
-</body>
-</html>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Analyzer.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Analyzer.scala b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Analyzer.scala
deleted file mode 100644
index f6fdff2..0000000
--- a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Analyzer.scala
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.examples.stock
-
-import scala.collection.immutable
-
-import akka.actor.Actor.Receive
-import org.joda.time.DateTime
-import org.joda.time.format.{DateTimeFormat, DateTimeFormatter}
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.examples.stock.Analyzer.HistoricalStates
-import org.apache.gearpump.streaming.examples.stock.Price._
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
-import org.apache.gearpump.util.LogUtil
-
-/**
- * Dradown analyzer
- * Definition: http://en.wikipedia.org/wiki/Drawdown_(economics)
- */
-class Analyzer(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
-
-  val dateFormatter = DateTimeFormat forPattern "dd/MM/yyyy"
-
-  private var stocksToReport = immutable.Set.empty[String]
-  private var stockInfos = new immutable.HashMap[String, StockPrice]
-
-  private var currentDownwardsStates = new immutable.HashMap[String, StockPriceState]
-  private val historicalStates = new HistoricalStates()
-  private var latestTimeStamp: Long = 0L
-
-  override def onStart(startTime: StartTime): Unit = {
-    LOG.info("analyzer is started")
-  }
-
-  override def onNext(msg: Message): Unit = {
-    msg.msg match {
-      case stock: StockPrice =>
-        latestTimeStamp = stock.timestamp
-        checkDate(stock)
-        stockInfos += stock.stockId -> stock
-        val downwardsState = updateCurrentStates(stock)
-        val maxDrawdown = historicalStates.updatePresentMaximal(downwardsState)
-    }
-  }
-
-  override def receiveUnManagedMessage: Receive = {
-    case get@GetReport(stockId, date) =>
-      var currentMax = currentDownwardsStates.get(stockId)
-
-      val dateTime = Option(date) match {
-        case Some(date) =>
-          currentMax = None
-          parseDate(dateFormatter, date)
-        case None =>
-          new DateTime(latestTimeStamp).withTimeAtStartOfDay
-      }
-
-      val historyMax = Option(dateTime).flatMap(handleHistoricalQuery(stockId, _))
-      val name = stockInfos.get(stockId).map(_.name).getOrElse("")
-      sender ! Report(stockId, name, dateTime.toString, historyMax, currentMax)
-  }
-
-  private def updateCurrentStates(stock: StockPrice) = {
-    var downwardsState: StockPriceState = null
-    if (currentDownwardsStates.contains(stock.stockId)) {
-      downwardsState = generateNewState(stock, currentDownwardsStates.get(stock.stockId).get)
-    } else {
-      downwardsState = StockPriceState(stock.stockId, stock, stock, stock)
-    }
-    currentDownwardsStates += stock.stockId -> downwardsState
-    downwardsState
-  }
-
-  // Update the stock's latest state.
-  private def generateNewState(currentPrice: Price, oldState: StockPriceState): StockPriceState = {
-    if (currentPrice.price > oldState.max.price) {
-      StockPriceState(oldState.stockID, currentPrice, currentPrice, currentPrice)
-    } else {
-      val newState = StockPriceState(oldState.stockID, oldState.max,
-        Price.min(currentPrice, oldState.min), currentPrice)
-      newState
-    }
-  }
-
-  private def checkDate(stock: StockPrice) = {
-    if (currentDownwardsStates.contains(stock.stockId)) {
-      val now = new DateTime(stock.timestamp)
-      val lastTime = new DateTime(currentDownwardsStates.get(stock.stockId).get.current.timestamp)
-      // New day
-      if (now.getDayOfYear > lastTime.getDayOfYear || now.getYear > lastTime.getYear) {
-        currentDownwardsStates -= stock.stockId
-      }
-    }
-  }
-
-  private def parseDate(format: DateTimeFormatter, input: String): DateTime = {
-    format.parseDateTime(input)
-  }
-
-  private def handleHistoricalQuery(stockId: String, date: DateTime) = {
-    val maximal = historicalStates.getHistoricalMaximal(stockId, date)
-    maximal
-  }
-}
-
-object Analyzer {
-
-  class HistoricalStates {
-    val LOG = LogUtil.getLogger(getClass)
-    val dateFormatter = DateTimeFormat forPattern "dd/MM/yyyy"
-    private var historicalMaxRaise = new immutable.HashMap[(String, DateTime), StockPriceState]
-    private var historicalMaxDrawdown = new immutable.HashMap[(String, DateTime), StockPriceState]
-
-    def updatePresentMaximal(newState: StockPriceState): Option[StockPriceState] = {
-      val date = Analyzer.getDateFromTimeStamp(newState.current.timestamp)
-      var newMaximalState: Option[StockPriceState] = null
-      if (newState.max.price < Float.MinPositiveValue) {
-        newMaximalState = generateNewMaximal(newState, date, historicalMaxRaise)
-        if (newMaximalState.nonEmpty) {
-          historicalMaxRaise += (newState.stockID, date) -> newMaximalState.get
-        }
-      } else {
-        newMaximalState = generateNewMaximal(newState, date, historicalMaxDrawdown)
-        if (newMaximalState.nonEmpty) {
-          historicalMaxDrawdown += (newState.stockID, date) -> newMaximalState.get
-        }
-      }
-      newMaximalState
-    }
-
-    def getHistoricalMaximal(stockId: String, date: DateTime): Option[StockPriceState] = {
-      historicalMaxDrawdown.get((stockId, date))
-    }
-
-    private def generateNewMaximal(
-        state: StockPriceState,
-        date: DateTime,
-        map: immutable.HashMap[(String, DateTime), StockPriceState])
-      : Option[StockPriceState] = {
-      val maximal = map.get((state.stockID, date))
-      if (maximal.nonEmpty && maximal.get.drawDown > state.drawDown) {
-        None
-      } else {
-        Some(state)
-      }
-    }
-  }
-
-  def getDateFromTimeStamp(timestamp: Long): DateTime = {
-    new DateTime(timestamp).withTimeAtStartOfDay()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Crawler.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Crawler.scala b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Crawler.scala
deleted file mode 100644
index bb444dd..0000000
--- a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Crawler.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.examples.stock
-
-import scala.concurrent.duration._
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
-
-class Crawler(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
-
-  import taskContext._
-
-  val FetchStockPrice = Message("FetchStockPrice")
-
-  lazy val stocks = {
-    val stockIds = conf.getValue[Array[String]]("StockId").get
-    val size = if (stockIds.length % parallelism > 0) {
-      stockIds.length / parallelism + 1
-    } else {
-      stockIds.length / parallelism
-    }
-
-    val start = taskId.index * size
-    val end = (taskId.index + 1) * size
-    stockIds.slice(start, end)
-  }
-
-  scheduleOnce(1.seconds)(self ! FetchStockPrice)
-
-  val stockMarket = conf.getValue[StockMarket](classOf[StockMarket].getName).get
-
-  override def onStart(startTime: StartTime): Unit = {
-    // Nothing
-  }
-
-  override def onNext(msg: Message): Unit = {
-    stockMarket.getPrice(stocks).foreach { price =>
-      output(new Message(price, price.timestamp))
-    }
-    scheduleOnce(5.seconds)(self ! FetchStockPrice)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Data.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Data.scala b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Data.scala
deleted file mode 100644
index 94a85ff..0000000
--- a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Data.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.examples.stock
-
-// scalastyle:off equals.hash.code  case class has equals defined
-case class StockPrice(
-    stockId: String, name: String, price: String, delta: String, pecent: String, volume: String,
-    money: String, timestamp: Long) {
-  override def hashCode: Int = stockId.hashCode
-}
-// scalastyle:on equals.hash.code  case class has equals defined
-
-case class Price(price: Float, timestamp: Long)
-
-object Price {
-
-  import scala.language.implicitConversions
-
-  implicit def StockPriceToPrice(stock: StockPrice): Price = {
-    Price(stock.price.toFloat, stock.timestamp)
-  }
-
-  def min(first: Price, second: Price): Price = {
-    if (first.price < second.price) {
-      first
-    } else {
-      second
-    }
-  }
-}
-
-case class StockPriceState(stockID: String, max: Price, min: Price, current: Price) {
-
-  def drawDownPeriod: Long = min.timestamp - max.timestamp
-
-  def recoveryPeriod: Long = current.timestamp - min.timestamp
-
-  def drawDown: Float = max.price - min.price
-}
-
-case class GetReport(stockId: String, date: String)
-
-case class Report(
-    stockId: String, name: String, date: String, historyMax: Option[StockPriceState],
-    currentMax: Option[StockPriceState])

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/QueryServer.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/QueryServer.scala b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/QueryServer.scala
deleted file mode 100644
index 01ccb3e..0000000
--- a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/QueryServer.scala
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.examples.stock
-
-import java.util.concurrent.TimeUnit
-import scala.concurrent.{ExecutionContext, Future}
-import scala.util.{Failure, Success}
-
-import akka.actor.Actor._
-import akka.actor.{Actor, ActorRefFactory, Props}
-import akka.io.IO
-import akka.pattern.ask
-import spray.can.Http
-import spray.http.StatusCodes
-import spray.json._
-import spray.routing.{HttpService, Route}
-import upickle.default.write
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterDataDetailRequest
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.ProcessorId
-import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef}
-import org.apache.gearpump.streaming.appmaster.{ProcessorSummary, StreamAppMasterSummary}
-import org.apache.gearpump.streaming.examples.stock.QueryServer.WebServer
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
-
-class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
-  import scala.concurrent.ExecutionContext.Implicits.global
-
-  import taskContext.{appId, appMaster}
-
-  var analyzer: (ProcessorId, ProcessorSummary) = null
-  implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
-
-  override def onStart(startTime: StartTime): Unit = {
-    appMaster ! AppMasterDataDetailRequest(appId)
-    taskContext.actorOf(Props(new WebServer))
-  }
-
-  override def onNext(msg: Message): Unit = {
-    // Skip
-  }
-
-  override def receiveUnManagedMessage: Receive = messageHandler
-
-  def messageHandler: Receive = {
-    case detail: StreamAppMasterSummary =>
-      analyzer = detail.processors.find { kv =>
-        val (processorId, processor) = kv
-        processor.taskClass == classOf[Analyzer].getName
-      }.get
-    case getReport@GetReport(stockId, date) =>
-      val parallism = analyzer._2.parallelism
-      val processorId = analyzer._1
-      val analyzerTaskId = TaskId(processorId, (stockId.hashCode & Integer.MAX_VALUE) % parallism)
-      val requester = sender
-      import scala.concurrent.Future
-      (appMaster ? LookupTaskActorRef(analyzerTaskId))
-        .asInstanceOf[Future[TaskActorRef]].flatMap { task =>
-
-        (task.task ? getReport).asInstanceOf[Future[Report]]
-      }.map { report =>
-        LOG.info(s"reporting $report")
-        requester ! report
-      }
-    case _ =>
-    // Ignore
-  }
-}
-
-object QueryServer {
-  class WebServer extends Actor with HttpService {
-
-    import context.dispatcher
-    implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
-    def actorRefFactory: ActorRefFactory = context
-    implicit val system = context.system
-
-    IO(Http) ! Http.Bind(self, interface = "localhost", port = 8080)
-
-    override def receive: Receive = runRoute(webServer ~ staticRoute)
-
-    def webServer: Route = {
-      path("report" / Segment) { stockId =>
-        get {
-          onComplete((context.parent ? GetReport(stockId, null)).asInstanceOf[Future[Report]]) {
-            case Success(report: Report) =>
-              val json = write(report)
-              complete(pretty(json))
-            case Failure(ex) => complete(StatusCodes.InternalServerError,
-              s"An error occurred: ${ex.getMessage}")
-          }
-        }
-      }
-    }
-
-    val staticRoute = {
-      pathEndOrSingleSlash {
-        getFromResource("stock/stock.html")
-      } ~
-      pathPrefix("css") {
-        get {
-          getFromResourceDirectory("stock/css")
-        }
-      } ~
-      pathPrefix("js") {
-        get {
-          getFromResourceDirectory("stock/js")
-        }
-      }
-    }
-
-    private def pretty(json: String): String = {
-      json.parseJson.prettyPrint
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/StockMarket.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/StockMarket.scala b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/StockMarket.scala
deleted file mode 100644
index 24e050b..0000000
--- a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/StockMarket.scala
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.examples.stock
-
-import java.nio.charset.Charset
-import scala.io.Codec
-
-import org.apache.commons.httpclient.methods.GetMethod
-import org.apache.commons.httpclient.{HttpClient, MultiThreadedHttpConnectionManager}
-import org.htmlcleaner.{HtmlCleaner, TagNode}
-import org.joda.time.{DateTime, DateTimeZone}
-
-import org.apache.gearpump.streaming.examples.stock.StockMarket.ServiceHour
-import org.apache.gearpump.transport.HostPort
-import org.apache.gearpump.util.LogUtil
-
-class StockMarket(service: ServiceHour, proxy: HostPort = null) extends Serializable {
-
-  private def LOG = LogUtil.getLogger(getClass)
-
-  @transient
-  private var connectionManager: MultiThreadedHttpConnectionManager = null
-
-  private val eastMoneyStockPage = "http://quote.eastmoney.com/stocklist.html"
-
-  private val stockPriceParser =
-    """^var\shq_str_s_([a-z0-9A-Z]+)="([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+)";$""".r
-
-  def shutdown(): Unit = {
-    Option(connectionManager).map(_.shutdown())
-  }
-
-  @transient
-  private var _client: HttpClient = null
-
-  private def client: HttpClient = {
-    _client = Option(_client).getOrElse {
-      val connectionManager = new MultiThreadedHttpConnectionManager()
-      val client = new HttpClient(connectionManager)
-      Option(proxy).map(host => client.getHostConfiguration().setProxy(host.host, host.port))
-      client
-    }
-    _client
-  }
-
-  def getPrice(stocks: Array[String]): Array[StockPrice] = {
-
-    LOG.info(s"getPrice 1")
-
-    val query = "http://hq.sinajs.cn/list=" + stocks.map("s_" + _).mkString(",")
-    if (service.inService) {
-
-      LOG.info(s"getPrice 2")
-
-      val get = new GetMethod(query)
-      client.executeMethod(get)
-      val current = System.currentTimeMillis()
-
-      val output = scala.io.Source.fromInputStream(get.getResponseBodyAsStream)(
-        new Codec(Charset forName "GBK")).getLines().flatMap { line =>
-        line match {
-          case stockPriceParser(stockId, name, price, delta, pecent, volume, money) =>
-            Some(StockPrice(stockId, name, price, delta, pecent, volume, money, current))
-          case _ =>
-            None
-        }
-      }.toArray
-
-      LOG.info(s"getPrice 3 ${output.length}")
-
-      output
-    } else {
-      Array.empty[StockPrice]
-    }
-  }
-
-  private val urlPattern = """^.*/([a-zA-Z0-9]+)\.html$""".r
-
-  def getStockIdList: Array[String] = {
-    val cleaner = new HtmlCleaner
-    val props = cleaner.getProperties
-
-    val get = new GetMethod(eastMoneyStockPage)
-    client.executeMethod(get)
-
-    val root = cleaner.clean(get.getResponseBodyAsStream)
-
-    val stockUrls = root.evaluateXPath("//div[@id='quotesearch']//li//a[@href]")
-
-    val elements = root.getElementsByName("a", true)
-
-    val hrefs = (0 until stockUrls.length)
-      .map(stockUrls(_).asInstanceOf[TagNode].getAttributeByName("href"))
-      .map { url =>
-        url match {
-          case urlPattern(code) => code
-          case _ => null
-        }
-      }.toArray
-    hrefs
-  }
-}
-
-object StockMarket {
-
-  class ServiceHour(all: Boolean) extends Serializable {
-
-    /**
-     * Morning openning: 9:30 am - 11:30 am
-     */
-    val morningStart = GMT8(new DateTime(0, 1, 1, 9, 30)).getMillis
-    val morningEnd = GMT8(new DateTime(0, 1, 1, 11, 30)).getMillis
-
-    /**
-     * After noon openning: 13:00 pm - 15:00 pm
-     */
-    val afternoonStart = GMT8(new DateTime(0, 1, 1, 13, 0)).getMillis
-    val afternoonEnd = GMT8(new DateTime(0, 1, 1, 15, 0)).getMillis
-
-    def inService: Boolean = {
-
-      if (all) {
-        true
-      } else {
-        val now = GMT8(DateTime.now()).withDate(0, 1, 1).getMillis
-        if (now >= morningStart && now <= morningEnd ||
-          now >= afternoonStart && now <= afternoonEnd) {
-          true
-        } else {
-          false
-        }
-      }
-    }
-
-    private def GMT8(time: DateTime): DateTime = {
-      time.withZone(DateTimeZone.UTC).plusHours(8)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/main/Stock.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/main/Stock.scala b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/main/Stock.scala
deleted file mode 100644
index 6d17c20..0000000
--- a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/main/Stock.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.examples.stock.main
-
-import akka.actor.ActorSystem
-import org.slf4j.Logger
-
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
-import org.apache.gearpump.partitioner.HashPartitioner
-import org.apache.gearpump.streaming.examples.stock.StockMarket.ServiceHour
-import org.apache.gearpump.streaming.examples.stock.{Analyzer, Crawler, QueryServer, StockMarket}
-import org.apache.gearpump.streaming.{Processor, StreamApplication}
-import org.apache.gearpump.transport.HostPort
-import org.apache.gearpump.util.Graph.Node
-import org.apache.gearpump.util.{AkkaApp, Graph, LogUtil}
-
-/** Tracks the China's stock market index change */
-object Stock extends AkkaApp with ArgumentsParser {
-
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    "crawler" -> CLIOption[Int]("<how many fetcher to get data from remote>",
-      required = false, defaultValue = Some(10)),
-    "analyzer" -> CLIOption[Int]("<parallism of analyzer>",
-      required = false, defaultValue = Some(1)),
-    "proxy" -> CLIOption[String]("proxy setting host:port, for example: 127.0.0.1:8443",
-      required = false, defaultValue = Some("")))
-
-  def crawler(config: ParseResult)(implicit system: ActorSystem): StreamApplication = {
-    val crawler = Processor[Crawler](config.getInt("crawler"))
-    val analyzer = Processor[Analyzer](config.getInt("analyzer"))
-    val queryServer = Processor[QueryServer](1)
-
-    val proxySetting = config.getString("proxy")
-    val proxy = if (proxySetting.isEmpty) {
-      null
-    } else HostPort(proxySetting)
-    val stockMarket = new StockMarket(new ServiceHour(true), proxy)
-    val stocks = stockMarket.getStockIdList
-
-    // scalastyle:off println
-    Console.println(s"Successfully fetched stock id for ${stocks.length} stocks")
-    // scalastyle:on println
-
-    val userConfig = UserConfig.empty.withValue("StockId", stocks)
-      .withValue[StockMarket](classOf[StockMarket].getName, stockMarket)
-    val partitioner = new HashPartitioner
-
-    val p1 = crawler ~ partitioner ~> analyzer
-    val p2 = Node(queryServer)
-    val graph = Graph(p1, p2)
-    val app = StreamApplication("stock_direct_analyzer", graph, userConfig
-    )
-    app
-  }
-
-  override def main(akkaConf: Config, args: Array[String]): Unit = {
-    val config = parse(args)
-    val context = ClientContext(akkaConf)
-
-    implicit val system = context.system
-
-    val app = crawler(config)
-    val appId = context.submit(app)
-    context.close()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/README.md
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/README.md b/examples/streaming/transport/README.md
deleted file mode 100644
index fc9bdfe..0000000
--- a/examples/streaming/transport/README.md
+++ /dev/null
@@ -1,3 +0,0 @@
-What is this?
-=============
-A smart transportation example which simulate a city with millions of cars.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/geardefault.conf
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/geardefault.conf b/examples/streaming/transport/src/main/resources/geardefault.conf
deleted file mode 100644
index 0c8f421..0000000
--- a/examples/streaming/transport/src/main/resources/geardefault.conf
+++ /dev/null
@@ -1,12 +0,0 @@
-gearpump {
-
-  serializers {
-    ## Follow this format when adding new serializer for new message types
-    ##    "org.apache.gearpump.Message" = "org.apache.gearpump.streaming.MessageSerializer"
-    "org.apache.gearpump.streaming.examples.transport.PassRecord" = ""
-  }
-}
-
-spray.can {
-  server.parsing.max-content-length = "10M"
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/transport/css/body.png
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/transport/css/body.png b/examples/streaming/transport/src/main/resources/transport/css/body.png
deleted file mode 100644
index b5c536c..0000000
Binary files a/examples/streaming/transport/src/main/resources/transport/css/body.png and /dev/null differ

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/transport/css/custom.css
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/transport/css/custom.css b/examples/streaming/transport/src/main/resources/transport/css/custom.css
deleted file mode 100644
index f324b6a..0000000
--- a/examples/streaming/transport/src/main/resources/transport/css/custom.css
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-.ui-datepicker {
-  font-size: 11px;
-}
-
-.sidebar-label {
-  font-size: 15px;
-  font-family: calibri, Arial, Helvetica, sans-serif;
-}
-
-.help {
-  font-size: 12px;
-  font-family: calibri, Arial, Helvetica, sans-serif;
-}
-
-div.splitter {
-  margin: 12px 0px 7px 0px;
-  clear: both;
-  border-top: 1px solid #EBEBEB;
-}
-
-input.sidebar {
-  width: 165px
-}
-
-select.sidebar {
-  width: 198px
-}
-
-table.dataintable {
-  font-family: calibri, Arial, Helvetica, sans-serif;
-  font-size: 15px;
-  margin-top: 10px;
-  border-collapse: collapse;
-  border: 1px solid #888;
-}
-
-table.dataintable th {
-  vertical-align: baseline;
-  padding: 5px 15px 5px 5px;
-  background-color: #EEE;
-  border: 1px solid #888;
-  text-align: left;
-}
-
-table.dataintable td {
-  vertical-align: text-top;
-  padding: 5px 15px 5px 5px;
-  background-color: #FFFFFF;
-  border: 1px solid #AAA;
-}
-
-#search {
-  width: 100px;
-  height: 25px;
-  position: relative;
-  left: 0px;
-  top: 5px;
-}
-
-#mytable {
-  width: 100%;
-  height: 300;
-  float: left;
-}
-
-#mychart {
-  height: 400px;
-  width: 100%;
-}
-
-#Menu {
-  height: 100%;
-  width: 245px;
-  float: left;
-}
-
-#header {
-  height: 115px;
-  background-image: url(header.png);
-}
-
-#body {
-  height: 100%;
-  width: 100%;
-  background-image: url(body.png);
-  background-size: 100% 100%;
-}
-
-#footer {
-  color: white;
-  height: 70px;
-  line-height: 70px;
-  text-align: middle;
-  clear: both;
-  text-align: center;
-  background-image: url(foot.png);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/transport/css/foot.png
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/transport/css/foot.png b/examples/streaming/transport/src/main/resources/transport/css/foot.png
deleted file mode 100644
index 5db91b5..0000000
Binary files a/examples/streaming/transport/src/main/resources/transport/css/foot.png and /dev/null differ

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/transport/css/header.png
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/transport/css/header.png b/examples/streaming/transport/src/main/resources/transport/css/header.png
deleted file mode 100644
index 9284e44..0000000
Binary files a/examples/streaming/transport/src/main/resources/transport/css/header.png and /dev/null differ


[2/3] incubator-gearpump git commit: [GEARPUMP-188] use java.time.Instant for Task start time

Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/transport/js/transport.js
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/transport/js/transport.js b/examples/streaming/transport/src/main/resources/transport/js/transport.js
deleted file mode 100644
index eef0fe9..0000000
--- a/examples/streaming/transport/src/main/resources/transport/js/transport.js
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-var myChart = echarts.init(document.getElementById("mychart"))
-
-echarts.util.mapData.params.params.football = {
-  getGeoJson: function (callback) {
-    $.ajax({
-      url: "../svg/beijing.svg",
-      dataType: 'xml',
-      success: function (xml) {
-        callback(xml)
-      }
-    });
-  }
-}
-
-function updateRecords(tableId) {
-  $.getJSON("records", function (json) {
-      var tableStr = "<table class=\"dataintable\" style=\"margin-left: 5px;\">";
-      tableStr += "<tr><th>Over Speed Vehicle ID</th><th>Speed</th><th>Location</th><th>Time</th></tr>";
-      var records = json.records;
-      for (var i = 0; i < Math.min(records.length, 20); i++) {
-        var record = records[i];
-        var vehicleId = record.vehicleId;
-        var location = record.locationId.split("_");
-        var speed = record.speed;
-        var row = location[1];
-        var column = location[2];
-        var time = new Date(Number(record.timestamp)).toLocaleTimeString().replace(/^\D*/, '');
-        tableStr += "<tr><td>" + vehicleId + "</td>";
-        tableStr += "<td>" + speed + "km/h </td>"
-        tableStr += "<td>(" + row + ", " + column + ")</td>";
-        tableStr += "<td>" + time + "</td></tr>";
-      }
-      if (records.length < 20) {
-        for (var i = records.length; i < 20; i++) {
-          tableStr += "<tr><td></td>";
-          tableStr += "<td> </td>"
-          tableStr += "<td> </td>";
-          tableStr += "<td> </td></tr>";
-        }
-      }
-      tableStr += "</table>"
-      document.getElementById(tableId).innerHTML = tableStr;
-    }
-  )
-}
-
-function initChart(chartid, vehicleId) {
-  // \u57fa\u4e8e\u51c6\u5907\u597d\u7684dom\uff0c\u521d\u59cb\u5316echarts\u56fe\u8868
-  $.getJSON("trace/" + vehicleId, function (json) {
-    // \u4e3aecharts\u5bf9\u8c61\u52a0\u8f7d\u6570\u636e
-    var records = json.records;
-    var timeLine = new Array(records.length);
-    var markPoints = new Array(records.length);
-    var options_ = new Array(records.length - 2);
-    for (var i = 0; i < records.length; i++) {
-      var record = records[i];
-      var vehicleId = record.vehicleId;
-      var location = record.locationId.split("_");
-      var row = location[1];
-      var column = location[2];
-      var time = new Date(Number(record.timeStamp)).toLocaleTimeString().replace(/^\D*/, '');
-      timeLine[i] = time;
-      var currentPonit = {name: "", value: i, geoCoord: [row * 90, column * 90]};
-      markPoints[i] = currentPonit;
-    }
-    options_[0] =
-    {
-      title: {
-        text: 'Vehicle trace'
-      },
-      tooltip: {
-        trigger: 'item'
-      },
-      toolbox: {
-        show: false,
-        feature: {
-          mark: {show: true},
-          dataView: {show: true, readOnly: false},
-          magicType: {show: true, type: ['line', 'bar']},
-          restore: {show: true},
-          saveAsImage: {show: true}
-        }
-      },
-      series: [
-        {
-          name: 'Vehicle trace',
-          type: 'map',
-          mapType: 'football',
-          mapLocation: {
-            y: 30,
-            height: 430
-          },
-          itemStyle: {
-            normal: {label: {show: false}},
-            emphasis: {label: {show: false}}
-          },
-          data: [
-            {name: 'City', hoverable: false, itemStyle: {normal: {label: {show: false}}}}
-          ],
-          markPoint: {
-            symbol: 'circle',
-            symbolSize: 8,
-            itemStyle: {
-              normal: {
-                borderWidth: 1,
-                color: 'blue',
-                lineStyle: {
-                  type: 'solid'
-                }
-              }
-            },
-            data: [markPoints[0]]
-          },
-          markLine: {
-            smooth: true,
-            effect: {
-              show: true,
-              scaleSize: 1.5,
-              period: 1.5,
-              color: '#fff'
-            },
-            itemStyle: {
-              normal: {
-                borderWidth: 2,
-                color: 'red',
-                lineStyle: {
-                  type: 'solid'
-                }
-              }
-            },
-            data: []
-          }
-        }
-      ]
-    }
-    for (var i = 1; i < markPoints.length; i++) {
-      options_[i] =
-      {
-        series: [
-          {
-            markPoint: {
-              data: [markPoints[i]]
-            },
-            markLine: {
-              data: []
-            }
-          }
-        ]
-      }
-    }
-    var option = {
-      timeline: {
-        type: 'number',
-        playInterval: 500,
-        autoPlay: true,
-        data: timeLine
-      },
-      options: options_
-    };
-    myChart.setOption(option);
-  });
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/transport/svg/beijing.svg
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/transport/svg/beijing.svg b/examples/streaming/transport/src/main/resources/transport/svg/beijing.svg
deleted file mode 100644
index 5342c24..0000000
--- a/examples/streaming/transport/src/main/resources/transport/svg/beijing.svg
+++ /dev/null
@@ -1,199 +0,0 @@
-<?xml version="1.0" encoding="utf-8"?>
-
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-<!-- Generator: Adobe Illustrator 14.0.0, SVG Export Plug-In . SVG Version: 6.00 Build 43363)  -->
-<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
-<svg version="1.1" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" x="0px" y="0px" width="1103px"
-	 height="1115px" viewBox="0 0 200 202" enable-background="new 0 0 1103 1115" xml:space="preserve">
-<g id="\u80cc\u666f">
-
-		<polygon fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points="
-		719,377 533,377 487,398 491,563 469,571 469,682 504,682 546,676 661,667 672,672 710,672 740,676 751,645 746,567 725,563
-		719,510 	"/>
-
-		<path fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" d="
-		M359,533V402l-7-76l36-18l333-10l76,65v315l-38,41h-49l-37,6H534l-81,31h-37c-33.5-0.5-56.5-54.5-57-70V533z"/>
-
-		<path fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" d="
-		M252,571v44l11,39l16,41v71l7.5,27.5L290,811l11,13l16,1l447-4l48-10l51-45l7-25l8-70V370c1.5-12.5-4.5-24.5-11-32l-103-96l-25-15
-		l-21-6l-392,17l-19,8l-17,24l-35,28l-4,10l4,38l-3,40v47l3,37l-3,40V571z"/>
-
-		<path fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" d="
-		M259,133l63-8l52-13l57-19h49l241,6l69,26h51c18.5,2.5,27.5,6.5,37,16l82,121l16,28l9,20l11,37l11,21l25,37l7,122l11,80v24v73l6,30
-		l-6,17L944,857l-93,45l-46,62l-17,10l-49,1c-17.5,1.5-37.5,20.5-43,35l-19,52l-18,12l-84,27c-5.5,1.5-21.5,6.5-27-13l-6-37
-		c-2.5-8.5-3.5-22.5-27-23l-151-18c-15.5-1.5-30.5,5.5-50,18l-23,12c-17.5,8.5-41.5-2.5-49-30l-15-60v-35c0,0-49.5-86.5-55-90
-		s-35-9-35-9l-38-65l-5-23l-33-60l-6-22l13-46v-89l2-43l-2-63l10-37l19-111l-3-27c0.5-8.5,5.5-30.5,24-36l29-6l25-15h46l17-10l16-22
-		L259,133z"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points="
-		1112,523 990,523 967,530 878,530 	"/>
-
-		<line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" x1="1030" y1="405" x2="1112" y2="406"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points="
-		1112,48 962,122 777.5,343.5 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points="
-		856,-7 856,10 848,31 797,81 788,93 763,186 706,298 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points="
-		559.5,303.5 565,270 547,207 422,-7 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points="
-		-10,1003 13,995 25,988 42,931 42,912 47,898 70,870 74,853 66,789 74,779 110,767 143,742 152,717 280,647 331,633 359.5,617.5
-		"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points="
-		456,1115 449,1049 449,1022 467,986 466.5,977.5 455.5,868.5 460,842 460.5,754.5 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points="
-		1103,1115 1066,1044 1042,1011 1015,982 984,927 952,888 901,849 778,703 	"/>
-
-		<line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" x1="1036.5" y1="768.5" x2="1112" y2="810"/>
-
-		<line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" x1="878" y1="671" x2="1112" y2="682"/>
-
-		<line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" x1="1019" y1="-7" x2="955" y2="129"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		-10,533 540,536 671,531 878,530 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		0,468 132,468 168,457 175,457 201,470 212,470 360,470 422,474 458,474 494,472 539,472 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		539,477 590,477 595,474 623,474 630,470 732,470 753,476 763,477 825,502 843,503 929,502 984.5,496.5 1037,500 1091,500
-		1112,503 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		494,565 533.5,564.5 556,562 616,562 626,559 670,559 680,562 725,562 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		1030,405 971,405 885,435 683,435 589.5,435.5 539,439 523,439 372,439 254,439 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		360,618 389,602 403,600 469,602 591.5,601.5 625.5,591.5 652,589 699,588 987,589 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		716,377 748,377 778,344 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		-10,596 6,603 47,603 63,588 91,572 394,572 472,571 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		725,544 689,544 674,559 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		539,406 474.5,410.5 458,415 424.5,417.5 400.5,414.5 378,413 305.5,379.5 283.5,385.5 254,384 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		878,530 864,541 725,549 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		213,688 231,694 278,694 397,682 467,682 	"/>
-
-		<line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" x1="469" y1="682" x2="460.5" y2="754.5"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		308,755 313,638 313,496 316,487 315.5,420.5 313,399 313,344 282,280 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		352,327 322,338 299,344 254,344 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		162,-7 206,13 240,48 287,120 294,175 312,193 312,206 324,219.5 330,280 346,298 352,327 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		332.5,-9 357,41 364,70 369,109 379,124 381,155 379,159 373,159 370,168 375,265 391,322 401,414 401,474 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		477,93 482,229 486,361 489,372 490,399 	"/>
-
-		<line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" x1="560" y1="304" x2="556" y2="377"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		537,377 542,536 542,694 539,703 535,716 535,727 530,825 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		613,596 615,671 624,924 666,921 666,930 670,935 678,991 755,1061 766,1115 	"/>
-
-		<line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" x1="600" y1="227" x2="604" y2="377"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		642,434 640,377 638,302 638,227 642,99 653,10 653,0 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		1037,769 951,709 951,677 870,622 678,623 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		653,11 670,58 668,85 668,110 664,129 664,146 668,176 667,199 667,227 672,369 664,369 664,377 672,609 678,622 680,725 	"/>
-
-		<line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" x1="878" y1="671" x2="745" y2="671"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		1112,470 973,472 965,474 770,474 763,478 	"/>
-
-		<line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" x1="845" y1="434" x2="842" y2="696"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		763,530 763,383 751,372 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		812.5,125 816,205 861,243 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		667,172 619,174 578,175 494,178 434,180 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		720,271 709,269 392,272 340,275 329,279 282,283 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		826,797 837,810 840.5,831.5 885.5,884.5 967.16,1014.78 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		888,1003 1025,917 1050,905 1064.5,888.5 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		1071,1051 1051,1065 1024,1081 987,1065 976,1051 961,1040 898,1017 861,959 851,935 837,923 813,888 763,838 757,825 751,769
-		749,737 747,682 737,677 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		1009,988 967.28,1015.13 933.5,1117.5 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		0,984 9,974 15,957 15,950 15,939 26,909 34,901 45,897 60,878 58,867 63,856 64,844 61,825 64,818 61,776 104,769 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		917,866 845,900 797,919 781,915 742,923 725,923 667,930 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		-3,1076 93,1076 148,1056 248,1055 270,1047 333,1009 458,909 	"/>
-
-		<polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#14A97E" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points="
-		239,129 241,154 251,172 270,176 292,176 295,187 312,193 346,192 352,184 371,184 374,237 377,272 391,327 401,414 422,418
-		449,416 472,411 530,406 539,414 542,677 560,703 535,725 530,810 	"/>
-</g>
-<g id="\u5c42_1" display="none">
-
-		<polyline display="inline" fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#0096C0" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
-		797,623.5 797,362.5 778,345.5 792,328.5 725,273.5 706,270.5 386.5,273 340,277.5 333,279.5 299,282.5 	"/>
-</g>
-</svg>

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/transport/transport.html
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/transport/transport.html b/examples/streaming/transport/src/main/resources/transport/transport.html
deleted file mode 100644
index baee931..0000000
--- a/examples/streaming/transport/src/main/resources/transport/transport.html
+++ /dev/null
@@ -1,88 +0,0 @@
-<!DOCTYPE html>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~      http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<html>
-
-<head>
-  <meta charset="utf-8">
-  <link rel=stylesheet type=text/css href="css/custom.css">
-  <script src="http://echarts.baidu.com/build/source/echarts-all.js"></script>
-  <script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js"></script>
-</head>
-
-<body style="background-color:#F2F2F2">
-<div id="container" style="width:882px; height:450px;margin-left:auto;margin-right:auto;">
-  <div style="height:0px"></div>
-  <div id="header">
-    <div
-      style="font-weight:600;position:relative;left:50px;top:50px;font-family: calibri, Arial, Helvetica, sans-serif;font-size:29px;color:white">
-      Big Data Transport Monitoring Demo
-    </div>
-  </div>
-  <div id="body">
-    <div id="Menu">
-      <div style="position:relative;margin-left:30px; margin-right:20px;margin-top:20px;">
-        <!-- form to post to accompany to get accompanying cars -->
-
-        <table style="width:100%">
-          <tr>
-            <td class="sidebar-label">Vehicle Id:</td>
-          </tr>
-          <tr>
-            <td class="sidebar-label"></td>
-          </tr>
-          <tr>
-            <td style="vertical-align:top;">
-              <input id="vehicleId" class="sidebar" type="text" name="vehicleId"/>
-            </td>
-          </tr>
-        </table>
-        <div class="splitter"></div>
-        <div>
-          <button id="search" onclick="search_onclick()">Search</button>
-        </div>
-      </div>
-    </div>
-    <div id="content"
-         style="height:100%;width:585px;float:left;position:relative;left:20px;overflow:scroll;">
-      <div
-        style="height:50px;position:relative;top:15px;vertical-align:middle;font-weight:300;font-family: calibri, Arial, Helvetica, sans-serif;font-size:22px;color:black">
-        Vehicle Trace:
-      </div>
-      <div style="height:7px;background-color:#92BDF2;"></div>
-
-      <div id="mychart"></div>
-
-      <div id="mytable"></div>
-    </div>
-  </div>
-  <div id="footer">
-    Big Data Team @ Intel
-  </div>
-  <script src="js/transport.js"></script>
-  <script type="text/javascript">
-    function search_onclick() {
-      var vehicleId = document.getElementById('vehicleId').value
-      initChart("mychart", vehicleId)
-    }
-    setInterval(updateRecords, 1000, "mytable")
-  </script>
-</div>
-</body>
-</html>

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Data.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Data.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Data.scala
deleted file mode 100644
index 0aaf72c..0000000
--- a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Data.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport
-
-case class LocationInfo(id: String, row: Int, column: Int)
-
-// scalastyle:off equals.hash.code
-case class PassRecord(vehicleId: String, locationId: String, timeStamp: Long) {
-  override def hashCode: Int = vehicleId.hashCode
-}
-// scalastyle:on equals.hash.code
-
-case class GetTrace(vehicleId: String)
-
-case class VehicleTrace(records: Array[PassRecord])
-
-case class OverSpeedReport(vehicleId: String, speed: String, timestamp: Long, locationId: String)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/DataSource.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/DataSource.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/DataSource.scala
deleted file mode 100644
index 555e850..0000000
--- a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/DataSource.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport
-
-import scala.concurrent.duration._
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.examples.transport.generator.{MockCity, PassRecordGenerator}
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
-
-class DataSource(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
-  import taskContext.{output, parallelism, scheduleOnce, taskId}
-  private val overdriveThreshold = conf.getInt(VelocityInspector.OVER_DRIVE_THRESHOLD).get
-  private val vehicleNum = conf.getInt(DataSource.VEHICLE_NUM).get / parallelism
-  private val citySize = conf.getInt(DataSource.MOCK_CITY_SIZE).get
-  private val mockCity = new MockCity(citySize)
-  private val recordGenerators: Array[PassRecordGenerator] =
-    PassRecordGenerator.create(vehicleNum, getIdentifier(taskId), mockCity, overdriveThreshold)
-
-  override def onStart(startTime: StartTime): Unit = {
-    self ! Message("start", System.currentTimeMillis())
-  }
-
-  override def onNext(msg: Message): Unit = {
-    recordGenerators.foreach(generator =>
-      output(Message(generator.getNextPassRecord(), System.currentTimeMillis())))
-    scheduleOnce(1.second)(self ! Message("continue", System.currentTimeMillis()))
-  }
-
-  private def getIdentifier(taskId: TaskId): String = {
-    // scalastyle:off non.ascii.character.disallowed
-    s"\u6caaA${taskId.processorId}${taskId.index}"
-    // scalastyle:on non.ascii.character.disallowed
-  }
-}
-
-object DataSource {
-  final val VEHICLE_NUM = "vehicle.number"
-  final val MOCK_CITY_SIZE = "mock.city.size"
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/QueryServer.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/QueryServer.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/QueryServer.scala
deleted file mode 100644
index ff3b4b4..0000000
--- a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/QueryServer.scala
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport
-
-import java.util.concurrent.TimeUnit
-import scala.concurrent.Future
-import scala.util.{Failure, Success}
-
-import akka.actor.Actor._
-import akka.actor.{Actor, ActorRefFactory, Props}
-import akka.io.IO
-import akka.pattern.ask
-import spray.can.Http
-import spray.http.StatusCodes
-import spray.json._
-import spray.routing.{HttpService, Route}
-import upickle.default.write
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.partitioner.PartitionerDescription
-import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef}
-import org.apache.gearpump.streaming.examples.transport.QueryServer.{GetAllRecords, WebServer}
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
-import org.apache.gearpump.streaming.{DAG, ProcessorDescription, ProcessorId, StreamApplication}
-import org.apache.gearpump.util.Graph
-
-class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
-  import system.dispatcher
-  import taskContext.appMaster
-
-  var inspector: (ProcessorId, ProcessorDescription) = null
-  implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
-  private var overSpeedRecords = List.empty[OverSpeedReport]
-
-  override def onStart(startTime: StartTime): Unit = {
-    val dag = DAG(conf.getValue[Graph[ProcessorDescription, PartitionerDescription]](
-      StreamApplication.DAG).get)
-    inspector = dag.processors.find { kv =>
-      val (_, processor) = kv
-      processor.taskClass == classOf[VelocityInspector].getName
-    }.get
-    taskContext.actorOf(Props(new WebServer))
-  }
-
-  override def onNext(msg: Message): Unit = {
-  }
-
-  override def receiveUnManagedMessage: Receive = {
-    case getTrace@GetTrace(vehicleId: String) =>
-      val parallism = inspector._2.parallelism
-      val processorId = inspector._1
-      val analyzerTaskId = TaskId(processorId, (vehicleId.hashCode & Integer.MAX_VALUE) % parallism)
-      val requester = sender
-      (appMaster ? LookupTaskActorRef(analyzerTaskId))
-        .asInstanceOf[Future[TaskActorRef]].flatMap { task =>
-        (task.task ? getTrace).asInstanceOf[Future[VehicleTrace]]
-      }.map { trace =>
-        LOG.info(s"reporting $trace")
-        requester ! trace
-      }
-    case record@OverSpeedReport(vehicleId, speed, timestamp, locationId) =>
-      LOG.info(s"vehicle $vehicleId is over speed, the speed is $speed km/h")
-      overSpeedRecords :+= record
-    case GetAllRecords =>
-      sender ! QueryServer.OverSpeedRecords(overSpeedRecords.toArray.sortBy(_.timestamp))
-      overSpeedRecords = List.empty[OverSpeedReport]
-    case _ =>
-    // Ignore
-  }
-}
-
-object QueryServer {
-  object GetAllRecords
-
-  case class OverSpeedRecords(records: Array[OverSpeedReport])
-
-  class WebServer extends Actor with HttpService {
-
-    import context.dispatcher
-    implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
-    def actorRefFactory: ActorRefFactory = context
-    implicit val system = context.system
-
-    IO(Http) ! Http.Bind(self, interface = "0.0.0.0", port = 8080)
-
-    override def receive: Receive = runRoute(webServer ~ staticRoute)
-
-    def webServer: Route = {
-      path("trace" / Segment) { vehicleId =>
-        get {
-          onComplete((context.parent ? GetTrace(vehicleId)).asInstanceOf[Future[VehicleTrace]]) {
-            case Success(trace: VehicleTrace) =>
-              val json = write(trace)
-              complete(pretty(json))
-            case Failure(ex) => complete(StatusCodes.InternalServerError,
-              s"An error occurred: ${ex.getMessage}")
-          }
-        }
-      } ~
-      path("records") {
-        get {
-          onComplete((context.parent ? GetAllRecords).asInstanceOf[Future[OverSpeedRecords]]) {
-            case Success(records: OverSpeedRecords) =>
-              val json = write(records)
-              complete(pretty(json))
-            case Failure(ex) => complete(StatusCodes.InternalServerError,
-              s"An error occurred: ${ex.getMessage}")
-          }
-        }
-      }
-    }
-
-    val staticRoute = {
-      pathEndOrSingleSlash {
-        getFromResource("transport/transport.html")
-      } ~
-        pathPrefix("css") {
-          get {
-            getFromResourceDirectory("transport/css")
-          }
-        } ~
-        pathPrefix("svg") {
-          get {
-            getFromResourceDirectory("transport/svg")
-          }
-        } ~
-        pathPrefix("js") {
-          get {
-            getFromResourceDirectory("transport/js")
-          }
-        }
-    }
-
-    private def pretty(json: String): String = {
-      json.parseJson.prettyPrint
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Transport.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Transport.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Transport.scala
deleted file mode 100644
index 5beb2e1..0000000
--- a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Transport.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport
-
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
-import org.apache.gearpump.partitioner.HashPartitioner
-import org.apache.gearpump.streaming.{Processor, StreamApplication}
-import org.apache.gearpump.util.Graph._
-import org.apache.gearpump.util.{AkkaApp, Graph}
-
-/** A city smart transportation streaming application */
-object Transport extends AkkaApp with ArgumentsParser {
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    "source" -> CLIOption[Int]("<how many task to generate data>", required = false,
-      defaultValue = Some(10)),
-    "inspector" -> CLIOption[Int]("<how many over speed inspector>", required = false,
-      defaultValue = Some(4)),
-    "vehicle" -> CLIOption[Int]("<how many vehicles's to generate>", required = false,
-      defaultValue = Some(1000)),
-    "citysize" -> CLIOption[Int]("<the blocks number of the mock city>", required = false,
-      defaultValue = Some(10)),
-    "threshold" -> CLIOption[Int]("<overdrive threshold, km/h>", required = false,
-      defaultValue = Some(60)))
-
-  def application(config: ParseResult): StreamApplication = {
-    val sourceNum = config.getInt("source")
-    val inspectorNum = config.getInt("inspector")
-    val vehicleNum = config.getInt("vehicle")
-    val citysize = config.getInt("citysize")
-    val threshold = config.getInt("threshold")
-    val source = Processor[DataSource](sourceNum)
-    val inspector = Processor[VelocityInspector](inspectorNum)
-    val queryServer = Processor[QueryServer](1)
-    val partitioner = new HashPartitioner
-
-    val userConfig = UserConfig.empty.withInt(DataSource.VEHICLE_NUM, vehicleNum).
-      withInt(DataSource.MOCK_CITY_SIZE, citysize).
-      withInt(VelocityInspector.OVER_DRIVE_THRESHOLD, threshold).
-      withInt(VelocityInspector.FAKE_PLATE_THRESHOLD, 200)
-    StreamApplication("transport", Graph(source ~ partitioner ~> inspector,
-      Node(queryServer)), userConfig)
-  }
-
-  override def main(akkaConf: Config, args: Array[String]): Unit = {
-    val config = parse(args)
-    val context = ClientContext(akkaConf)
-    implicit val system = context.system
-    context.submit(application(config))
-    context.close()
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/VelocityInspector.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/VelocityInspector.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/VelocityInspector.scala
deleted file mode 100644
index 4d9bd04..0000000
--- a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/VelocityInspector.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport
-
-import java.util.concurrent.TimeUnit
-import scala.collection.immutable.Queue
-import scala.collection.mutable
-import scala.concurrent.Future
-
-import akka.actor.Actor._
-import akka.actor.ActorRef
-import akka.pattern.ask
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.partitioner.PartitionerDescription
-import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef}
-import org.apache.gearpump.streaming.examples.transport.generator.MockCity
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
-import org.apache.gearpump.streaming.{DAG, ProcessorDescription, StreamApplication}
-import org.apache.gearpump.util.Graph
-
-class VelocityInspector(taskContext: TaskContext, conf: UserConfig)
-  extends Task(taskContext, conf) {
-
-  import system.dispatcher
-  import taskContext.appMaster
-  implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
-  private val passRecords = mutable.Map.empty[String, Queue[PassRecord]]
-  private val fakePlateThreshold = conf.getInt(VelocityInspector.FAKE_PLATE_THRESHOLD).get
-  private val overdriveThreshold = conf.getInt(VelocityInspector.OVER_DRIVE_THRESHOLD).get
-  private val citySize = conf.getInt(DataSource.MOCK_CITY_SIZE).get
-  private val mockCity = new MockCity(citySize)
-  private var queryServerActor: ActorRef = null
-
-  override def onStart(startTime: StartTime): Unit = {
-    val dag = DAG(conf.getValue[Graph[ProcessorDescription, PartitionerDescription]](
-      StreamApplication.DAG).get)
-    val queryServer = dag.processors.find { kv =>
-      val (_, processor) = kv
-      processor.taskClass == classOf[QueryServer].getName
-    }.get
-    val queryServerTaskId = TaskId(queryServer._1, 0)
-    (appMaster ? LookupTaskActorRef(queryServerTaskId)).asInstanceOf[Future[TaskActorRef]]
-      .map { task =>
-        queryServerActor = task.task
-      }
-  }
-
-  import org.apache.gearpump.streaming.examples.transport.VelocityInspector._
-  override def onNext(msg: Message): Unit = {
-    msg.msg match {
-      case passRecord: PassRecord =>
-        val records = passRecords.getOrElse(passRecord.vehicleId, Queue.empty[PassRecord])
-        if (records.size > 0) {
-          val velocity = getVelocity(passRecord, records.last)
-          val formatted = "%.2f".format(velocity)
-          if (velocity > overdriveThreshold) {
-            if (velocity > fakePlateThreshold) {
-              LOG.info(s"vehicle ${passRecord.vehicleId} maybe a fake plate, " +
-                s"the speed is $formatted km/h")
-            }
-            if (queryServerActor != null) {
-              queryServerActor ! OverSpeedReport(passRecord.vehicleId, formatted,
-                passRecord.timeStamp, passRecord.locationId)
-            }
-          }
-        }
-        passRecords.update(passRecord.vehicleId, records.enqueueFinite(passRecord, RECORDS_NUM))
-    }
-  }
-
-  override def receiveUnManagedMessage: Receive = {
-    case GetTrace(vehicleId) =>
-      val records = passRecords.getOrElse(vehicleId, Queue.empty[PassRecord])
-      sender ! VehicleTrace(records.toArray.sortBy(_.timeStamp))
-  }
-
-  private def getVelocity(passRecord: PassRecord, lastPassRecord: PassRecord): Float = {
-    val distanceInKm = getDistance(lastPassRecord.locationId, passRecord.locationId)
-    val timeInHour = (passRecord.timeStamp - lastPassRecord.timeStamp).toFloat / (1000 * 60 * 60)
-    distanceInKm / timeInHour
-  }
-
-  private def getDistance(location1: String, location2: String): Long = {
-    mockCity.getDistance(location1, location2)
-  }
-}
-
-object VelocityInspector {
-  final val OVER_DRIVE_THRESHOLD = "overdrive.threshold"
-  final val FAKE_PLATE_THRESHOLD = "fakeplate.threshold"
-  final val RECORDS_NUM = 20
-
-  class FiniteQueue[T](q: Queue[T]) {
-    def enqueueFinite[B >: T](elem: B, maxSize: Int): Queue[B] = {
-      var result = q.enqueue(elem)
-      while (result.size > maxSize) {
-        result = result.dequeue._2
-      }
-      result
-    }
-  }
-
-  import scala.language.implicitConversions
-
-  implicit def queue2FiniteQueue[T](q: Queue[T]): FiniteQueue[T] = new FiniteQueue[T](q)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCity.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCity.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCity.scala
deleted file mode 100644
index 60e0bcf..0000000
--- a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCity.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport.generator
-
-import scala.util.Random
-
-import org.apache.gearpump.streaming.examples.transport.generator.MockCity._
-
-class MockCity(size: Int) {
-  private val random = new Random()
-  private val directions = Array(UP, DOWN, LEFT, RIGHT)
-
-  def nextLocation(currentLocationId: String): String = {
-    val coordinate = idToCoordinate(currentLocationId)
-    val direction = directions(random.nextInt(4))
-    val newCoordinate = coordinate.addOffset(direction)
-    if (inCity(newCoordinate)) {
-      coordinateToId(newCoordinate)
-    } else {
-      nextLocation(currentLocationId)
-    }
-  }
-
-  def getDistance(locationId1: String, locationId2: String): Long = {
-    val coordinate1 = idToCoordinate(locationId1)
-    val coordinate2 = idToCoordinate(locationId2)
-    val blocks = Math.abs(coordinate1.row - coordinate2.row) +
-      Math.abs(coordinate1.column - coordinate2.column)
-    blocks * LENGTH_PER_BLOCK
-  }
-
-  def randomLocationId(): String = {
-    val row = random.nextInt(size)
-    val column = random.nextInt(size)
-    coordinateToId(Coordinate(row, column))
-  }
-
-  private def coordinateToId(coordinate: Coordinate): String = {
-    s"Id_${coordinate.row}_${coordinate.column}"
-  }
-
-  private def idToCoordinate(locationId: String): Coordinate = {
-    val attr = locationId.split("_")
-    val row = attr(1).toInt
-    val column = attr(2).toInt
-    Coordinate(row, column)
-  }
-
-  private def inCity(coordinate: Coordinate): Boolean = {
-    coordinate.row >= 0 &&
-      coordinate.row < size &&
-      coordinate.column >= 0 &&
-      coordinate.column < size
-  }
-}
-
-object MockCity {
-  // The length of the mock city, km
-  final val LENGTH_PER_BLOCK = 5
-  // The minimal speed, km/h
-  final val MINIMAL_SPEED = 10
-
-  final val UP = Coordinate(0, 1)
-  final val DOWN = Coordinate(0, -1)
-  final val LEFT = Coordinate(-1, 0)
-  final val RIGHT = Coordinate(1, 0)
-
-  case class Coordinate(row: Int, column: Int) {
-    def addOffset(coordinate: Coordinate): Coordinate = {
-      Coordinate(this.row + coordinate.row, this.column + coordinate.column)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala
deleted file mode 100644
index e8c1c59..0000000
--- a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport.generator
-
-import scala.util.Random
-
-import org.apache.gearpump.streaming.examples.transport.PassRecord
-import org.apache.gearpump.util.LogUtil
-
-class PassRecordGenerator(vehicleId: String, city: MockCity, overdriveThreshold: Int) {
-  private val LOG = LogUtil.getLogger(getClass)
-  LOG.info(s"Generate pass record for vehicle $vehicleId")
-  private var timeStamp = System.currentTimeMillis()
-
-  private var locationId = city.randomLocationId()
-  private val random = new Random()
-  private val fakePlate = random.nextInt(1000) < 1000 * PassRecordGenerator.FAKE_PLATE_RATE
-  private val (randomMin, randomRange) = {
-    val lowerBound = MockCity.LENGTH_PER_BLOCK * 1000 * 60 * 60 / overdriveThreshold.toFloat
-    val upperBound = MockCity.LENGTH_PER_BLOCK * 1000 * 60 * 60 / MockCity.MINIMAL_SPEED.toFloat
-    val overdrive = (upperBound - lowerBound) * PassRecordGenerator.OVERDRIVE_RATE
-    val randomMin = Math.max(lowerBound - overdrive, PassRecordGenerator.TWOMINUTES)
-    val randomRange = upperBound - randomMin
-    (randomMin.toInt, randomRange.toInt)
-  }
-
-  def getNextPassRecord(): PassRecord = {
-    locationId = if (fakePlate) {
-      city.randomLocationId()
-    } else {
-      city.nextLocation(locationId)
-    }
-    timeStamp += (random.nextInt(randomRange) + randomMin)
-    PassRecord(vehicleId, locationId, timeStamp)
-  }
-}
-
-object PassRecordGenerator {
-  final val FAKE_PLATE_RATE = 0.01F
-  final val OVERDRIVE_RATE = 0.05F
-  final val TWOMINUTES = 2 * 60 * 1000
-
-  def create(generatorNum: Int, prefix: String, city: MockCity, overdriveThreshold: Int)
-    : Array[PassRecordGenerator] = {
-    var result = Map.empty[String, PassRecordGenerator]
-    val digitsNum = (Math.log10(generatorNum) + 1).toInt
-    for (i <- 1 to generatorNum) {
-      val vehicleId = prefix + s"%0${digitsNum}d".format(i)
-      val generator = new PassRecordGenerator(vehicleId, city, overdriveThreshold)
-      result += vehicleId -> generator
-    }
-    result.values.toArray
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/DataSourceSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/DataSourceSpec.scala b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/DataSourceSpec.scala
deleted file mode 100644
index 1f525ae..0000000
--- a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/DataSourceSpec.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport
-
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalatest.{FlatSpec, Matchers}
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
-
-class DataSourceSpec extends FlatSpec with Matchers {
-  it should "create the pass record" in {
-    val vehicleNum = 2
-    val context = MockUtil.mockTaskContext
-
-    val userConfig = UserConfig.empty.withInt(DataSource.VEHICLE_NUM, vehicleNum).
-      withInt(DataSource.MOCK_CITY_SIZE, 10).
-      withInt(VelocityInspector.OVER_DRIVE_THRESHOLD, 60).
-      withInt(VelocityInspector.FAKE_PLATE_THRESHOLD, 200)
-
-    val source = new DataSource(context, userConfig)
-    source.onStart(StartTime(0))
-    source.onNext(Message("start"))
-    verify(context, times(vehicleNum)).output(any[Message])
-    source.onStop()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/TransportSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/TransportSpec.scala b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/TransportSpec.scala
deleted file mode 100644
index 2f83de5..0000000
--- a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/TransportSpec.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport
-
-import scala.concurrent.Future
-import scala.util.Success
-
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{BeforeAndAfterAll, Matchers, PropSpec}
-
-import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication
-import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult
-import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
-
-class TransportSpec
-  extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll with MasterHarness {
-
-  override def beforeAll {
-    startActorSystem()
-  }
-
-  override def afterAll {
-    shutdownActorSystem()
-  }
-
-  protected override def config = TestUtil.DEFAULT_CONFIG
-
-  property("Transport should succeed to submit application with required arguments") {
-    val requiredArgs = Array.empty[String]
-    val optionalArgs = Array(
-      "-source", "1",
-      "-inspector", "1",
-      "-vehicle", "100",
-      "-citysize", "10",
-      "-threshold", "60")
-
-    val args = {
-      Table(
-        ("requiredArgs", "optionalArgs"),
-        (requiredArgs, optionalArgs)
-      )
-    }
-    val masterReceiver = createMockMaster()
-    forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) =>
-      val args = requiredArgs ++ optionalArgs
-
-      Future {
-        Transport.main(masterConfig, args)
-      }
-      masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
-      masterReceiver.reply(SubmitApplicationResult(Success(0)))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCitySpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCitySpec.scala b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCitySpec.scala
deleted file mode 100644
index ba4eb2d..0000000
--- a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCitySpec.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport.generator
-
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-class MockCitySpec extends PropSpec with PropertyChecks with Matchers {
-
-  property("MockCity should maintain the location properly") {
-    val city = new MockCity(10)
-    val start = city.randomLocationId()
-    val nextLocation = city.nextLocation(start)
-    assert(city.getDistance(start, nextLocation) == MockCity.LENGTH_PER_BLOCK)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala
deleted file mode 100644
index f0eebbf..0000000
--- a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport.generator
-
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-class PassRecordGeneratorSpec extends PropSpec with PropertyChecks with Matchers {
-
-  property("PassRecordGenerator should generate pass record") {
-    val id = "test"
-    val city = new MockCity(10)
-    val generator = new PassRecordGenerator(id, city, 60)
-    val passrecord1 = generator.getNextPassRecord()
-    val passrecord2 = generator.getNextPassRecord()
-    assert(city.getDistance(passrecord1.locationId, passrecord2.locationId) ==
-      MockCity.LENGTH_PER_BLOCK)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java
index 76069c1..0a8fb4f 100644
--- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java
+++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java
@@ -21,9 +21,10 @@ package org.apache.gearpump.streaming.examples.wordcountjava;
 import org.apache.gearpump.Message;
 import org.apache.gearpump.cluster.UserConfig;
 import org.apache.gearpump.streaming.javaapi.Task;
-import org.apache.gearpump.streaming.task.StartTime;
 import org.apache.gearpump.streaming.task.TaskContext;
 
+import java.time.Instant;
+
 public class Split extends Task {
 
   public static String TEXT = "This is a good start for java! bingo! bingo! ";
@@ -37,7 +38,7 @@ public class Split extends Task {
   }
 
   @Override
-  public void onStart(StartTime startTime) {
+  public void onStart(Instant startTime) {
     self().tell(new Message("start", now()), self());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java
index 89c3b14..3daa6e0 100644
--- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java
+++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java
@@ -21,10 +21,10 @@ package org.apache.gearpump.streaming.examples.wordcountjava;
 import org.apache.gearpump.Message;
 import org.apache.gearpump.cluster.UserConfig;
 import org.apache.gearpump.streaming.javaapi.Task;
-import org.apache.gearpump.streaming.task.StartTime;
 import org.apache.gearpump.streaming.task.TaskContext;
 import org.slf4j.Logger;
 
+import java.time.Instant;
 import java.util.HashMap;
 
 public class Sum extends Task {
@@ -37,7 +37,7 @@ public class Sum extends Task {
   }
 
   @Override
-  public void onStart(StartTime startTime) {
+  public void onStart(Instant startTime) {
     //skip
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
index ae63f10..af3c04c 100644
--- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
+++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
@@ -18,16 +18,17 @@
 
 package org.apache.gearpump.streaming.examples.wordcount
 
+import java.time.Instant
 import java.util.concurrent.TimeUnit
 
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
 class Split(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
   import taskContext.output
 
-  override def onStart(startTime: StartTime): Unit = {
+  override def onStart(startTime: Instant): Unit = {
     self ! Message("start")
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala
index c3fa82a..dbefc93 100644
--- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala
+++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala
@@ -18,6 +18,7 @@
 
 package org.apache.gearpump.streaming.examples.wordcount
 
+import java.time.Instant
 import java.util.concurrent.TimeUnit
 import scala.collection.mutable
 import scala.concurrent.duration.FiniteDuration
@@ -26,7 +27,7 @@ import akka.actor.Cancellable
 
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
 class Sum(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
   private[wordcount] val map: mutable.HashMap[String, Long] = new mutable.HashMap[String, Long]()
@@ -37,7 +38,7 @@ class Sum(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext,
 
   private var scheduler: Cancellable = null
 
-  override def onStart(startTime: StartTime): Unit = {
+  override def onStart(startTime: Instant): Unit = {
     scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS),
       new FiniteDuration(30, TimeUnit.SECONDS))(reportWordCount)
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala
index cef9337..8b50890 100644
--- a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala
+++ b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala
@@ -17,6 +17,8 @@
  */
 package org.apache.gearpump.streaming.examples.wordcount
 
+import java.time.Instant
+
 import scala.concurrent.Await
 import scala.concurrent.duration.Duration
 
@@ -29,7 +31,6 @@ import org.scalatest.{Matchers, WordSpec}
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.{TestUtil, UserConfig}
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
 
 class SplitSpec extends WordSpec with Matchers {
 
@@ -47,10 +48,10 @@ class SplitSpec extends WordSpec with Matchers {
 
       val conf = UserConfig.empty
       val split = new Split(taskContext, conf)
-      split.onStart(StartTime(0))
+      split.onStart(Instant.EPOCH)
       mockTaskActor.expectMsgType[Message]
 
-      val expectedWordCount = Split.TEXT_TO_SPLIT.split( """[\s\n]+""").filter(_.nonEmpty).length
+      val expectedWordCount = Split.TEXT_TO_SPLIT.split( """[\s\n]+""").count(_.nonEmpty)
 
       split.onNext(Message("next"))
       verify(taskContext, times(expectedWordCount)).output(anyObject())

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SumSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SumSpec.scala b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SumSpec.scala
index e42d696..17e1765 100644
--- a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SumSpec.scala
+++ b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SumSpec.scala
@@ -17,6 +17,8 @@
  */
 package org.apache.gearpump.streaming.examples.wordcount
 
+import java.time.Instant
+
 import org.scalacheck.Gen
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
@@ -24,7 +26,6 @@ import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
 
 class SumSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter {
   val stringGenerator = Gen.alphaStr
@@ -39,7 +40,7 @@ class SumSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndA
 
     val sum = new Sum(taskContext, conf)
 
-    sum.onStart(StartTime(0))
+    sum.onStart(Instant.EPOCH)
 
     forAll(stringGenerator) { txt =>
       wordcount += 1

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala
index 1d3048e..e3b45fb 100644
--- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala
@@ -18,6 +18,7 @@
 
 package org.apache.gearpump.experiments.storm.processor
 
+import java.time.Instant
 import java.util.concurrent.TimeUnit
 import scala.concurrent.duration.Duration
 
@@ -46,7 +47,7 @@ private[storm] class StormProcessor(gearpumpBolt: GearpumpBolt,
 
   private val freqOpt = gearpumpBolt.getTickFrequency
 
-  override def onStart(startTime: StartTime): Unit = {
+  override def onStart(startTime: Instant): Unit = {
     gearpumpBolt.start(startTime)
     freqOpt.foreach(scheduleTick)
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala
index 5d4a6a2..b92f037 100644
--- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala
@@ -18,6 +18,7 @@
 
 package org.apache.gearpump.experiments.storm.producer
 
+import java.time.Instant
 import java.util.concurrent.TimeUnit
 
 import akka.actor.Actor.Receive
@@ -48,7 +49,7 @@ private[storm] class StormProducer(gearpumpSpout: GearpumpSpout,
 
   private val timeoutMillis = gearpumpSpout.getMessageTimeout
 
-  override def onStart(startTime: StartTime): Unit = {
+  override def onStart(startTime: Instant): Unit = {
     gearpumpSpout.start(startTime)
     if (gearpumpSpout.ackEnabled) {
       getCheckpointClock

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
index d0f2949..a8e061c 100644
--- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
@@ -19,6 +19,7 @@
 package org.apache.gearpump.experiments.storm.topology
 
 import java.io.{File, FileOutputStream, IOException}
+import java.time.Instant
 import java.util
 import java.util.jar.JarFile
 import java.util.{HashMap => JHashMap, List => JList, Map => JMap}
@@ -40,7 +41,7 @@ import org.apache.gearpump.experiments.storm.util.StormConstants._
 import org.apache.gearpump.experiments.storm.util.StormUtil._
 import org.apache.gearpump.experiments.storm.util.{StormOutputCollector, StormUtil}
 import org.apache.gearpump.streaming.DAG
-import org.apache.gearpump.streaming.task.{GetDAG, TaskId, TaskContext, StartTime}
+import org.apache.gearpump.streaming.task.{GetDAG, TaskId, TaskContext}
 import org.apache.gearpump.util.{Constants, LogUtil}
 import org.apache.gearpump.{Message, TimeStamp}
 import org.slf4j.Logger
@@ -57,7 +58,7 @@ trait GearpumpStormComponent {
    * invoked at Task.onStart
    * @param startTime task start time
    */
-  def start(startTime: StartTime): Unit
+  def start(startTime: Instant): Unit
 
   /**
    * invoked at Task.onNext
@@ -123,7 +124,7 @@ object GearpumpStormComponent {
 
     private var collector: StormSpoutOutputCollector = null
 
-    override def start(startTime: StartTime): Unit = {
+    override def start(startTime: Instant): Unit = {
       val dag = getDAG(taskContext.appMaster)
       val topologyContext = getTopologyContext(dag, taskContext.taskId)
       collector = getOutputCollector(taskContext, topologyContext)
@@ -206,7 +207,7 @@ object GearpumpStormComponent {
     private var generalTopologyContext: GeneralTopologyContext = null
     private var tickTuple: Tuple = null
 
-    override def start(startTime: StartTime): Unit = {
+    override def start(startTime: Instant): Unit = {
       val dag = getDAG(taskContext.appMaster)
       topologyContext = getTopologyContext(dag, taskContext.taskId)
       generalTopologyContext = getGeneralTopologyContext(dag)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormProcessorSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormProcessorSpec.scala
index 2111df6..9bbac58 100644
--- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormProcessorSpec.scala
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormProcessorSpec.scala
@@ -18,11 +18,12 @@
 
 package org.apache.gearpump.experiments.storm.processor
 
+import java.time.Instant
+
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpBolt
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
 import org.mockito.Mockito._
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.{Matchers, WordSpec}
@@ -31,7 +32,7 @@ class StormProcessorSpec extends WordSpec with Matchers with MockitoSugar {
 
   "StormProcessor" should {
     "start GearpumpSpout onStart" in {
-      val startTime = mock[StartTime]
+      val startTime = Instant.EPOCH
       val gearpumpBolt = mock[GearpumpBolt]
       when(gearpumpBolt.getTickFrequency).thenReturn(None)
       val taskContext = MockUtil.mockTaskContext

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormProducerSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormProducerSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormProducerSpec.scala
index 39a008f..ee89a4a 100644
--- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormProducerSpec.scala
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormProducerSpec.scala
@@ -18,12 +18,13 @@
 
 package org.apache.gearpump.experiments.storm.producer
 
+import java.time.Instant
+
 import akka.testkit.TestProbe
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpSpout
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
 import org.mockito.Mockito._
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.{Matchers, WordSpec}
@@ -32,7 +33,7 @@ class StormProducerSpec extends WordSpec with Matchers with MockitoSugar {
 
   "StormProducer" should {
     "start GearpumpSpout onStart" in {
-      val startTime = mock[StartTime]
+      val startTime = Instant.EPOCH
       val gearpumpSpout = mock[GearpumpSpout]
       when(gearpumpSpout.getMessageTimeout).thenReturn(None)
       val taskContext = MockUtil.mockTaskContext

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
index bdea50c..0891070 100644
--- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.gearpump.experiments.storm.topology
 
+import java.time.Instant
 import java.util.{Map => JMap}
 
 import akka.actor.ActorRef
@@ -26,7 +27,7 @@ import backtype.storm.tuple.Tuple
 import org.apache.gearpump.experiments.storm.producer.StormSpoutOutputCollector
 import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, GearpumpSpout}
 import org.apache.gearpump.experiments.storm.util.StormOutputCollector
-import org.apache.gearpump.streaming.task.{StartTime, TaskContext, TaskId}
+import org.apache.gearpump.streaming.task.{TaskContext, TaskId}
 import org.apache.gearpump.streaming.{DAG, MockUtil}
 import org.apache.gearpump.{Message, TimeStamp}
 import org.mockito.Matchers.{anyObject, eq => mockitoEq}
@@ -59,8 +60,7 @@ class GearpumpStormComponentSpec
       getOutputCollector, ackEnabled = false, taskContext)
 
     // Start
-    val startTime = mock[StartTime]
-    gearpumpSpout.start(startTime)
+    gearpumpSpout.start(Instant.EPOCH)
 
     verify(spout).open(mockitoEq(config), mockitoEq(topologyContext),
       anyObject[SpoutOutputCollector])
@@ -100,8 +100,7 @@ class GearpumpStormComponentSpec
         getGeneralTopologyContext, getOutputCollector, getTickTuple, taskContext)
 
       // Start
-      val startTime = mock[StartTime]
-      gearpumpBolt.start(startTime)
+      gearpumpBolt.start(Instant.EPOCH)
 
       verify(bolt).prepare(mockitoEq(config), mockitoEq(topologyContext),
         anyObject[OutputCollector])

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
index ef383ad..b92b2e1 100644
--- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
@@ -79,7 +79,6 @@ class GearpumpStormTopologySpec extends WordSpec with Matchers with MockitoSugar
     "get target processors from source id" in {
       val stormTopology = TopologyUtil.getTestTopology
       implicit val system = MockUtil.system
-      val sysConfig = new JHashMap[AnyRef, AnyRef]
       val gearpumpStormTopology =
         GearpumpStormTopology("app", stormTopology, null)
       val targets0 = gearpumpStormTopology.getTargets("1")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
index da08b04..314eae8 100644
--- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
@@ -18,6 +18,7 @@
 
 package org.apache.gearpump.streaming.kafka.lib.source
 
+import java.time.Instant
 import java.util.Properties
 
 import com.twitter.bijection.Injection
@@ -87,11 +88,11 @@ abstract class AbstractKafkaSource(
     this.checkpointStoreFactory = Some(checkpointStoreFactory)
   }
 
-  override def open(context: TaskContext, startTime: TimeStamp): Unit = {
+  override def open(context: TaskContext, startTime: Instant): Unit = {
     import context.{parallelism, taskId}
 
     LOG.info("KafkaSource opened at start time {}", startTime)
-    this.startTime = startTime
+    this.startTime = startTime.toEpochMilli
     val topicList = topic.split(",", -1).toList
     val grouper = config.getConfiguredInstance(KafkaConfig.PARTITION_GROUPER_CLASS_CONFIG,
       classOf[PartitionGrouper])

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala
index e40276f..6ccb231 100644
--- a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala
+++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala
@@ -18,6 +18,7 @@
 
 package org.apache.gearpump.streaming.kafka
 
+import java.time.Instant
 import java.util.Properties
 
 import com.twitter.bijection.Injection
@@ -42,7 +43,7 @@ import org.scalatest.{Matchers, PropSpec}
 
 class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
 
-  val startTimeGen = Gen.choose[Long](0L, 100L)
+  val startTimeGen = Gen.choose[Long](0L, 100L).map(Instant.ofEpochMilli)
   val offsetGen = Gen.choose[Long](0L, 100L)
   val topicAndPartitionGen = for {
     topic <- Gen.alphaStr suchThat (_.nonEmpty)
@@ -51,7 +52,7 @@ class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with Mo
   val tpsGen = Gen.listOf[TopicAndPartition](topicAndPartitionGen) suchThat (_.nonEmpty)
 
   property("KafkaSource open should not recover without checkpoint") {
-    forAll(startTimeGen, tpsGen) { (startTime: Long, tps: List[TopicAndPartition]) =>
+    forAll(startTimeGen, tpsGen) { (startTime: Instant, tps: List[TopicAndPartition]) =>
       val taskContext = MockUtil.mockTaskContext
       val fetchThread = mock[FetchThread]
       val kafkaClient = mock[KafkaClient]
@@ -84,7 +85,7 @@ class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with Mo
 
   property("KafkaSource open should recover with checkpoint") {
     forAll(startTimeGen, offsetGen, tpsGen) {
-      (startTime: Long, offset: Long, tps: List[TopicAndPartition]) =>
+      (startTime: Instant, offset: Long, tps: List[TopicAndPartition]) =>
         val taskContext = MockUtil.mockTaskContext
         val checkpointStoreFactory = mock[CheckpointStoreFactory]
         val checkpointStores = tps.map(_ -> mock[CheckpointStore]).toMap
@@ -115,7 +116,8 @@ class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with Mo
         checkpointStores.foreach { case (tp, store) =>
           when(checkpointStoreFactory.getCheckpointStore(
             KafkaConfig.getCheckpointStoreNameSuffix(tp))).thenReturn(store)
-          when(store.recover(startTime)).thenReturn(Some(Injection[Long, Array[Byte]](offset)))
+          when(store.recover(startTime.toEpochMilli))
+            .thenReturn(Some(Injection[Long, Array[Byte]](offset)))
         }
 
         source.setCheckpointStore(checkpointStoreFactory)