You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/08/15 15:04:34 UTC

[3/3] incubator-gearpump git commit: [GEARPUMP-188] use java.time.Instant for Task start time

[GEARPUMP-188] use java.time.Instant for Task start time

Author: manuzhang <ow...@gmail.com>

Closes #74 from manuzhang/replace_timestamp_with_instant.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/23daf0cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/23daf0cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/23daf0cf

Branch: refs/heads/master
Commit: 23daf0cf9c1db3fabc1b679993fcf1d6edb43e7d
Parents: 6d919ec
Author: manuzhang <ow...@gmail.com>
Authored: Mon Aug 15 23:04:11 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Mon Aug 15 23:04:11 2016 +0800

----------------------------------------------------------------------
 .../pagerank/PageRankController.scala           |   4 +-
 .../streaming/examples/complexdag/Node.scala    |   6 +-
 .../streaming/examples/complexdag/Sink.scala    |   6 +-
 .../streaming/examples/complexdag/Source.scala  |   6 +-
 .../examples/fsio/SeqFileStreamProcessor.scala  |   8 +-
 .../examples/fsio/SeqFileStreamProducer.scala   |   9 +-
 .../fsio/SeqFileStreamProcessorSpec.scala       |   5 +-
 .../fsio/SeqFileStreamProducerSpec.scala        |   5 +-
 .../examples/kafka/wordcount/Split.scala        |   7 +-
 .../examples/kafka/wordcount/Sum.scala          |   7 +-
 .../examples/kafka/wordcount/SumSpec.scala      |   5 +-
 .../examples/sol/SOLStreamProcessor.scala       |   5 +-
 .../examples/sol/SOLStreamProducer.scala        |   5 +-
 .../examples/sol/SOLStreamProcessorSpec.scala   |   5 +-
 .../examples/sol/SOLStreamProducerSpec.scala    |   5 +-
 .../processor/NumberGeneratorProcessor.scala    |   8 +-
 .../state/processor/CountProcessorSpec.scala    |   6 +-
 .../NumberGeneratorProcessorSpec.scala          |   5 +-
 .../processor/WindowAverageProcessorSpec.scala  |   6 +-
 examples/streaming/stockcrawler/README.md       |  19 --
 .../src/main/resources/geardefault.conf         |   9 -
 .../src/main/resources/stock/css/body.png       | Bin 201 -> 0 bytes
 .../src/main/resources/stock/css/custom.css     | 115 -----------
 .../src/main/resources/stock/css/foot.png       | Bin 2250 -> 0 bytes
 .../src/main/resources/stock/css/header.png     | Bin 17350 -> 0 bytes
 .../src/main/resources/stock/js/stock.js        | 157 ---------------
 .../src/main/resources/stock/stock.html         |  87 --------
 .../streaming/examples/stock/Analyzer.scala     | 170 ----------------
 .../streaming/examples/stock/Crawler.scala      |  60 ------
 .../streaming/examples/stock/Data.scala         |  61 ------
 .../streaming/examples/stock/QueryServer.scala  | 134 -------------
 .../streaming/examples/stock/StockMarket.scala  | 155 ---------------
 .../streaming/examples/stock/main/Stock.scala   |  86 --------
 examples/streaming/transport/README.md          |   3 -
 .../src/main/resources/geardefault.conf         |  12 --
 .../src/main/resources/transport/css/body.png   | Bin 201 -> 0 bytes
 .../src/main/resources/transport/css/custom.css | 115 -----------
 .../src/main/resources/transport/css/foot.png   | Bin 2250 -> 0 bytes
 .../src/main/resources/transport/css/header.png | Bin 17350 -> 0 bytes
 .../main/resources/transport/js/transport.js    | 180 -----------------
 .../main/resources/transport/svg/beijing.svg    | 199 -------------------
 .../src/main/resources/transport/transport.html |  88 --------
 .../streaming/examples/transport/Data.scala     |  32 ---
 .../examples/transport/DataSource.scala         |  56 ------
 .../examples/transport/QueryServer.scala        | 154 --------------
 .../examples/transport/Transport.scala          |  69 -------
 .../examples/transport/VelocityInspector.scala  | 123 ------------
 .../examples/transport/generator/MockCity.scala |  88 --------
 .../generator/PassRecordGenerator.scala         |  69 -------
 .../examples/transport/DataSourceSpec.scala     |  45 -----
 .../examples/transport/TransportSpec.scala      |  69 -------
 .../transport/generator/MockCitySpec.scala      |  31 ---
 .../generator/PassRecordGeneratorSpec.scala     |  34 ----
 .../streaming/examples/wordcountjava/Split.java |   5 +-
 .../streaming/examples/wordcountjava/Sum.java   |   4 +-
 .../streaming/examples/wordcount/Split.scala    |   5 +-
 .../streaming/examples/wordcount/Sum.scala      |   5 +-
 .../examples/wordcount/SplitSpec.scala          |   7 +-
 .../streaming/examples/wordcount/SumSpec.scala  |   5 +-
 .../storm/processor/StormProcessor.scala        |   3 +-
 .../storm/producer/StormProducer.scala          |   3 +-
 .../storm/topology/GearpumpStormComponent.scala |   9 +-
 .../storm/processor/StormProcessorSpec.scala    |   5 +-
 .../storm/producer/StormProducerSpec.scala      |   5 +-
 .../topology/GearpumpStormComponentSpec.scala   |   9 +-
 .../topology/GearpumpStormTopologySpec.scala    |   1 -
 .../kafka/lib/source/AbstractKafkaSource.scala  |   5 +-
 .../streaming/kafka/KafkaSourceSpec.scala       |  10 +-
 .../apache/gearpump/streaming/javaapi/Task.java |   5 +-
 .../gearpump/streaming/dsl/StreamApp.scala      |  32 +--
 .../streaming/dsl/plan/OpTranslator.scala       |  24 +--
 .../gearpump/streaming/sink/DataSinkTask.scala  |   7 +-
 .../gearpump/streaming/source/DataSource.scala  |   6 +-
 .../streaming/source/DataSourceTask.scala       |   8 +-
 .../streaming/state/api/PersistentTask.scala    |   8 +-
 .../gearpump/streaming/task/StartTime.scala     |  24 ---
 .../apache/gearpump/streaming/task/Task.scala   |   6 +-
 .../gearpump/streaming/task/TaskActor.scala     |   7 +-
 .../gearpump/streaming/task/TaskWrapper.scala   |  12 +-
 .../streaming/appmaster/AppMasterSpec.scala     |  10 +-
 .../streaming/appmaster/TaskManagerSpec.scala   |   4 +-
 .../streaming/appmaster/TaskSchedulerSpec.scala |  11 +-
 .../gearpump/streaming/dsl/StreamSpec.scala     |   4 +-
 .../streaming/dsl/plan/OpTranslatorSpec.scala   |  12 +-
 .../streaming/sink/DataSinkTaskSpec.scala       |   8 +-
 .../streaming/source/DataSourceTaskSpec.scala   |   7 +-
 .../streaming/task/SubscriptionSpec.scala       |   7 +-
 87 files changed, 199 insertions(+), 2617 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankController.scala
----------------------------------------------------------------------
diff --git a/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankController.scala b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankController.scala
index d461876..aa250da 100644
--- a/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankController.scala
+++ b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankController.scala
@@ -17,6 +17,8 @@
  */
 package org.apache.gearpump.experiments.pagerank
 
+import java.time.Instant
+
 import akka.actor.Actor.Receive
 
 import org.apache.gearpump.cluster.UserConfig
@@ -39,7 +41,7 @@ class PageRankController(taskContext: TaskContext, conf: UserConfig)
   var weights = Map.empty[TaskId, Double]
   var deltas = Map.empty[TaskId, Double]
 
-  override def onStart(startTime: StartTime): Unit = {
+  override def onStart(startTime: Instant): Unit = {
     output(Tick(tick), tasks: _*)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala
index 8d163f9..ddd4d1a 100644
--- a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala
+++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala
@@ -18,14 +18,16 @@
 
 package org.apache.gearpump.streaming.examples.complexdag
 
+import java.time.Instant
+
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
 class Node(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
   import taskContext.output
 
-  override def onStart(startTime: StartTime): Unit = {}
+  override def onStart(startTime: Instant): Unit = {}
 
   override def onNext(msg: Message): Unit = {
     val list = msg.msg.asInstanceOf[Vector[String]]

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala
index 8dfa565..e9b00a0 100644
--- a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala
+++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala
@@ -18,9 +18,11 @@
 
 package org.apache.gearpump.streaming.examples.complexdag
 
+import java.time.Instant
+
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
 import scala.collection.mutable
 
@@ -28,7 +30,7 @@ class Sink(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext,
 
   var list = mutable.MutableList[String]()
 
-  override def onStart(startTime: StartTime): Unit = {
+  override def onStart(startTime: Instant): Unit = {
     list += getClass.getCanonicalName
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala
index 0359519..7abb3fc 100644
--- a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala
+++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala
@@ -18,14 +18,16 @@
 
 package org.apache.gearpump.streaming.examples.complexdag
 
+import java.time.Instant
+
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
 class Source(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
   import taskContext.output
 
-  override def onStart(startTime: StartTime): Unit = {
+  override def onStart(startTime: Instant): Unit = {
     self ! Message("start")
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
index 2e4a556..561346e 100644
--- a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
+++ b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
@@ -18,19 +18,19 @@
 package org.apache.gearpump.streaming.examples.fsio
 
 import java.io.File
+import java.time.Instant
 import java.util.concurrent.TimeUnit
-import scala.concurrent.duration.FiniteDuration
 
+import scala.concurrent.duration.FiniteDuration
 import akka.actor.Cancellable
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.io.SequenceFile._
 import org.apache.hadoop.io.{SequenceFile, Text}
-
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.examples.fsio.HadoopConfig._
 import org.apache.gearpump.streaming.examples.fsio.SeqFileStreamProcessor._
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
 class SeqFileStreamProcessor(taskContext: TaskContext, config: UserConfig)
   extends Task(taskContext, config) {
@@ -49,7 +49,7 @@ class SeqFileStreamProcessor(taskContext: TaskContext, config: UserConfig)
   private var snapShotTime: Long = 0
   private var scheduler: Cancellable = null
 
-  override def onStart(startTime: StartTime): Unit = {
+  override def onStart(startTime: Instant): Unit = {
 
     val fs = FileSystem.get(hadoopConf)
     fs.deleteOnExit(outputPath)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
index 02d2434..4106a2c 100644
--- a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
+++ b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
@@ -17,15 +17,16 @@
  */
 package org.apache.gearpump.streaming.examples.fsio
 
+import java.time.Instant
+
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.io.SequenceFile._
 import org.apache.hadoop.io.{SequenceFile, Text}
-
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.examples.fsio.HadoopConfig._
 import org.apache.gearpump.streaming.examples.fsio.SeqFileStreamProducer._
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
 class SeqFileStreamProducer(taskContext: TaskContext, config: UserConfig)
   extends Task(taskContext, config) {
@@ -34,12 +35,12 @@ class SeqFileStreamProducer(taskContext: TaskContext, config: UserConfig)
 
   val value = new Text()
   val key = new Text()
-  var reader: SequenceFile.Reader = null
+  var reader: SequenceFile.Reader = _
   val hadoopConf = config.hadoopConf
   val fs = FileSystem.get(hadoopConf)
   val inputPath = new Path(config.getString(INPUT_PATH).get)
 
-  override def onStart(startTime: StartTime): Unit = {
+  override def onStart(startTime: Instant): Unit = {
     reader = new SequenceFile.Reader(hadoopConf, Reader.file(inputPath))
     self ! Start
     LOG.info("sequence file spout initiated")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala
index 7831b14..2edb87f 100644
--- a/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala
+++ b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala
@@ -18,6 +18,7 @@
 package org.apache.gearpump.streaming.examples.fsio
 
 import java.io.File
+import java.time.Instant
 import scala.collection.mutable.ArrayBuffer
 
 import akka.actor.ActorSystem
@@ -33,7 +34,7 @@ import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
 
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.{TestUtil, UserConfig}
-import org.apache.gearpump.streaming.task.{StartTime, TaskId}
+import org.apache.gearpump.streaming.task.TaskId
 import org.apache.gearpump.streaming.{MockUtil, Processor}
 class SeqFileStreamProcessorSpec
   extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter {
@@ -67,7 +68,7 @@ class SeqFileStreamProcessorSpec
     when(context.taskId).thenReturn(taskId)
 
     val processor = new SeqFileStreamProcessor(context, conf)
-    processor.onStart(StartTime(0))
+    processor.onStart(Instant.EPOCH)
 
     forAll(kvGenerator) { kv =>
       val (key, value) = kv

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
index ad27e63..a03e68d 100644
--- a/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
+++ b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
@@ -17,6 +17,8 @@
  */
 package org.apache.gearpump.streaming.examples.fsio
 
+import java.time.Instant
+
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.hadoop.conf.Configuration
@@ -32,7 +34,6 @@ import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
 import org.apache.gearpump.streaming.MockUtil._
-import org.apache.gearpump.streaming.task.StartTime
 
 class SeqFileStreamProducerSpec
   extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter {
@@ -73,7 +74,7 @@ class SeqFileStreamProducerSpec
     val context = MockUtil.mockTaskContext
 
     val producer = new SeqFileStreamProducer(context, conf)
-    producer.onStart(StartTime(0))
+    producer.onStart(Instant.EPOCH)
     producer.onNext(Message("start"))
 
     val expected = kvPairs.map(kv => kv._1 + "++" + kv._2).toSet

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala
index a95f596..b78e788 100644
--- a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala
+++ b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala
@@ -18,16 +18,17 @@
 
 package org.apache.gearpump.streaming.examples.kafka.wordcount
 
-import com.twitter.bijection.Injection
+import java.time.Instant
 
+import com.twitter.bijection.Injection
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
 class Split(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
   import taskContext.output
 
-  override def onStart(startTime: StartTime): Unit = {
+  override def onStart(startTime: Instant): Unit = {
   }
 
   override def onNext(msg: Message): Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala
index 9930b92..58bb884 100644
--- a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala
+++ b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala
@@ -18,18 +18,19 @@
 
 package org.apache.gearpump.streaming.examples.kafka.wordcount
 
-import com.twitter.bijection.Injection
+import java.time.Instant
 
+import com.twitter.bijection.Injection
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
 class Sum(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
   import taskContext.output
 
   private[wordcount] var wordcount = Map.empty[String, Long]
 
-  override def onStart(startTime: StartTime): Unit = {}
+  override def onStart(startTime: Instant): Unit = {}
 
   override def onNext(message: Message): Unit = {
     val word = message.msg.asInstanceOf[String]

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala b/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala
index 3538ece..e37118a 100644
--- a/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala
+++ b/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala
@@ -17,6 +17,8 @@
  */
 package org.apache.gearpump.streaming.examples.kafka.wordcount
 
+import java.time.Instant
+
 import scala.collection.mutable
 
 import org.mockito.Matchers._
@@ -27,7 +29,6 @@ import org.scalatest.{FlatSpec, Matchers}
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
 
 class SumSpec extends FlatSpec with Matchers {
 
@@ -39,7 +40,7 @@ class SumSpec extends FlatSpec with Matchers {
     val taskContext = MockUtil.mockTaskContext
 
     val sum = new Sum(taskContext, UserConfig.empty)
-    sum.onStart(StartTime(0))
+    sum.onStart(Instant.EPOCH)
     val str = "once two two three three three"
 
     var totalWordCount = 0

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
index 796b0d2..a16cf4c 100644
--- a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
+++ b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
@@ -18,6 +18,7 @@
 
 package org.apache.gearpump.streaming.examples.sol
 
+import java.time.Instant
 import java.util.concurrent.TimeUnit
 import scala.concurrent.duration.FiniteDuration
 
@@ -25,7 +26,7 @@ import akka.actor.Cancellable
 
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
 class SOLStreamProcessor(taskContext: TaskContext, conf: UserConfig)
   extends Task(taskContext, conf) {
@@ -38,7 +39,7 @@ class SOLStreamProcessor(taskContext: TaskContext, conf: UserConfig)
   private var snapShotWordCount: Long = 0
   private var snapShotTime: Long = 0
 
-  override def onStart(startTime: StartTime): Unit = {
+  override def onStart(startTime: Instant): Unit = {
     scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS),
       new FiniteDuration(5, TimeUnit.SECONDS))(reportWordCount())
     snapShotTime = System.currentTimeMillis()

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala
index 84ed038..c1b11e5 100644
--- a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala
+++ b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala
@@ -18,12 +18,13 @@
 
 package org.apache.gearpump.streaming.examples.sol
 
+import java.time.Instant
 import java.util.Random
 
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.examples.sol.SOLStreamProducer._
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
 class SOLStreamProducer(taskContext: TaskContext, conf: UserConfig)
   extends Task(taskContext, conf) {
@@ -36,7 +37,7 @@ class SOLStreamProducer(taskContext: TaskContext, conf: UserConfig)
   private var rand: Random = null
   private var messageCount: Long = 0
 
-  override def onStart(startTime: StartTime): Unit = {
+  override def onStart(startTime: Instant): Unit = {
     prepareRandomMessage
     self ! Start
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala b/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala
index a6cc966..e3344bf 100644
--- a/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala
+++ b/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala
@@ -17,6 +17,8 @@
  */
 package org.apache.gearpump.streaming.examples.sol
 
+import java.time.Instant
+
 import org.mockito.Mockito._
 import org.scalacheck.Gen
 import org.scalatest.{FlatSpec, Matchers}
@@ -24,7 +26,6 @@ import org.scalatest.{FlatSpec, Matchers}
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
 
 class SOLStreamProcessorSpec extends FlatSpec with Matchers {
 
@@ -33,7 +34,7 @@ class SOLStreamProcessorSpec extends FlatSpec with Matchers {
     val context = MockUtil.mockTaskContext
 
     val sol = new SOLStreamProcessor(context, UserConfig.empty)
-    sol.onStart(StartTime(0))
+    sol.onStart(Instant.EPOCH)
     val msg = Message("msg")
     sol.onNext(msg)
     verify(context, times(1)).output(msg)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala b/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala
index 2316de8..dc21171 100644
--- a/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala
+++ b/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala
@@ -17,6 +17,8 @@
  */
 package org.apache.gearpump.streaming.examples.sol
 
+import java.time.Instant
+
 import org.mockito.Matchers._
 import org.mockito.Mockito._
 import org.scalatest.{Matchers, WordSpec}
@@ -24,7 +26,6 @@ import org.scalatest.{Matchers, WordSpec}
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
 
 class SOLStreamProducerSpec extends WordSpec with Matchers {
 
@@ -35,7 +36,7 @@ class SOLStreamProducerSpec extends WordSpec with Matchers {
       val context = MockUtil.mockTaskContext
 
       val producer = new SOLStreamProducer(context, conf)
-      producer.onStart(StartTime(0))
+      producer.onStart(Instant.EPOCH)
       producer.onNext(Message("msg"))
       verify(context).output(any[Message])
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
index 0e85f32..134afba 100644
--- a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
+++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
@@ -18,17 +18,19 @@
 
 package org.apache.gearpump.streaming.examples.state.processor
 
+import java.time.Instant
+
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
 class NumberGeneratorProcessor(taskContext: TaskContext, conf: UserConfig)
   extends Task(taskContext, conf) {
   import taskContext.output
 
   private var num = 0L
-  override def onStart(startTime: StartTime): Unit = {
-    num = startTime.startTime
+  override def onStart(startTime: Instant): Unit = {
+    num = startTime.toEpochMilli
     self ! Message("start")
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
index 6048034..b95d164 100644
--- a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
+++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
@@ -18,6 +18,8 @@
 
 package org.apache.gearpump.streaming.examples.state.processor
 
+import java.time.Instant
+
 import scala.concurrent.Await
 import scala.concurrent.duration._
 
@@ -33,7 +35,7 @@ import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
 import org.apache.gearpump.streaming.state.api.PersistentTask
 import org.apache.gearpump.streaming.state.impl.{InMemoryCheckpointStoreFactory, PersistentStateConfig}
-import org.apache.gearpump.streaming.task.{UpdateCheckpointClock, StartTime}
+import org.apache.gearpump.streaming.task.UpdateCheckpointClock
 import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
 
 class CountProcessorSpec extends PropSpec with PropertyChecks with Matchers {
@@ -59,7 +61,7 @@ class CountProcessorSpec extends PropSpec with PropertyChecks with Matchers {
         val appMaster = TestProbe()(system)
         when(taskContext.appMaster).thenReturn(appMaster.ref)
 
-        count.onStart(StartTime(0L))
+        count.onStart(Instant.EPOCH)
         appMaster.expectMsg(UpdateCheckpointClock(taskContext.taskId, 0L))
 
         for (i <- 0L to num) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala
index 2268994..d3f645c 100644
--- a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala
+++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala
@@ -18,6 +18,8 @@
 
 package org.apache.gearpump.streaming.examples.state.processor
 
+import java.time.Instant
+
 import scala.concurrent.Await
 import scala.concurrent.duration.Duration
 
@@ -30,7 +32,6 @@ import org.scalatest.{Matchers, WordSpec}
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
 
 class NumberGeneratorProcessorSpec extends WordSpec with Matchers {
   "NumberGeneratorProcessor" should {
@@ -47,7 +48,7 @@ class NumberGeneratorProcessorSpec extends WordSpec with Matchers {
 
       val conf = UserConfig.empty
       val genNum = new NumberGeneratorProcessor(taskContext, conf)
-      genNum.onStart(StartTime(0))
+      genNum.onStart(Instant.EPOCH)
       mockTaskActor.expectMsgType[Message]
 
       genNum.onNext(Message("next"))

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
index 0963429..255f869 100644
--- a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
+++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
@@ -18,6 +18,8 @@
 
 package org.apache.gearpump.streaming.examples.state.processor
 
+import java.time.Instant
+
 import scala.concurrent.Await
 import scala.concurrent.duration._
 
@@ -34,7 +36,7 @@ import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
 import org.apache.gearpump.streaming.state.api.PersistentTask
 import org.apache.gearpump.streaming.state.impl.{InMemoryCheckpointStoreFactory, PersistentStateConfig, WindowConfig}
-import org.apache.gearpump.streaming.task.{UpdateCheckpointClock, StartTime}
+import org.apache.gearpump.streaming.task.UpdateCheckpointClock
 import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
 
 class WindowAverageProcessorSpec extends PropSpec with PropertyChecks with Matchers {
@@ -61,7 +63,7 @@ class WindowAverageProcessorSpec extends PropSpec with PropertyChecks with Match
         val appMaster = TestProbe()(system)
         when(taskContext.appMaster).thenReturn(appMaster.ref)
 
-        windowAverage.onStart(StartTime(0L))
+        windowAverage.onStart(Instant.EPOCH)
         appMaster.expectMsg(UpdateCheckpointClock(taskContext.taskId, 0L))
 
         for (i <- 0L until num) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/README.md
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/README.md b/examples/streaming/stockcrawler/README.md
deleted file mode 100644
index b51590f..0000000
--- a/examples/streaming/stockcrawler/README.md
+++ /dev/null
@@ -1,19 +0,0 @@
-How to use
-===================
-1. Start local cluster, 
-  ```
-  bin/local
-  ```
-2. Submit the stock crawler
-  ```
-  bin\gear app -jar examples\gearpump-examples-assembly-0.3.2-SNAPSHOT.jar org.apache.gearpump.streaming.examples.stock.main.Stock
-  ```
-  
-  If you are behind  a proxy, you need to set the proxy address
-  ```
-  bin\gear app -jar examples\gearpump-examples-assembly-0.3.2-SNAPSHOT.jar org.apache.gearpump.streaming.examples.stock.main.Stock -proxy host:port
-  ```
-  
-3. Check the UI
-  http://127.0.0.1:8080  
-  
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/resources/geardefault.conf
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/resources/geardefault.conf b/examples/streaming/stockcrawler/src/main/resources/geardefault.conf
deleted file mode 100644
index acee3bd..0000000
--- a/examples/streaming/stockcrawler/src/main/resources/geardefault.conf
+++ /dev/null
@@ -1,9 +0,0 @@
-gearpump {
-  serializers {
-    "org.apache.gearpump.streaming.examples.stock.StockPrice" = ""
-  }
-}
-
-spray.can {
-  server.parsing.max-content-length = "10M"
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/resources/stock/css/body.png
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/resources/stock/css/body.png b/examples/streaming/stockcrawler/src/main/resources/stock/css/body.png
deleted file mode 100644
index b5c536c..0000000
Binary files a/examples/streaming/stockcrawler/src/main/resources/stock/css/body.png and /dev/null differ

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/resources/stock/css/custom.css
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/resources/stock/css/custom.css b/examples/streaming/stockcrawler/src/main/resources/stock/css/custom.css
deleted file mode 100644
index 182d722..0000000
--- a/examples/streaming/stockcrawler/src/main/resources/stock/css/custom.css
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-.ui-datepicker {
-  font-size: 11px;
-}
-
-.sidebar-label {
-  font-size: 15px;
-  font-family: calibri, Arial, Helvetica, sans-serif;
-}
-
-.help {
-  font-size: 12px;
-  font-family: calibri, Arial, Helvetica, sans-serif;
-}
-
-div.splitter {
-  margin: 12px 0px 7px 0px;
-  clear: both;
-  border-top: 1px solid #EBEBEB;
-}
-
-input.sidebar {
-  width: 165px
-}
-
-select.sidebar {
-  width: 198px
-}
-
-table.dataintable {
-  font-family: calibri, Arial, Helvetica, sans-serif;
-  font-size: 15px;
-  margin-top: 10px;
-  border-collapse: collapse;
-  border: 1px solid #888;
-}
-
-table.dataintable th {
-  vertical-align: baseline;
-  padding: 5px 15px 5px 5px;
-  background-color: #EEE;
-  border: 1px solid #888;
-  text-align: left;
-}
-
-table.dataintable td {
-  vertical-align: text-top;
-  padding: 5px 15px 5px 5px;
-  background-color: #FFFFFF;
-  border: 1px solid #AAA;
-}
-
-#search {
-  width: 100px;
-  height: 25px;
-  position: relative;
-  left: 0px;
-  top: 5px;
-}
-
-#mytable {
-  width: 100%;
-  height: 300;
-  float: left;
-}
-
-#mychart {
-  height: 250px;
-  width: 100%;
-}
-
-#Menu {
-  height: 100%;
-  width: 245px;
-  float: left;
-}
-
-#header {
-  height: 115px;
-  background-image: url(header.png);
-}
-
-#body {
-  height: 100%;
-  width: 100%;
-  background-image: url(body.png);
-  background-size: 100% 100%;
-}
-
-#footer {
-  color: white;
-  height: 70px;
-  line-height: 70px;
-  text-align: middle;
-  clear: both;
-  text-align: center;
-  background-image: url(foot.png);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/resources/stock/css/foot.png
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/resources/stock/css/foot.png b/examples/streaming/stockcrawler/src/main/resources/stock/css/foot.png
deleted file mode 100644
index 5db91b5..0000000
Binary files a/examples/streaming/stockcrawler/src/main/resources/stock/css/foot.png and /dev/null differ

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/resources/stock/css/header.png
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/resources/stock/css/header.png b/examples/streaming/stockcrawler/src/main/resources/stock/css/header.png
deleted file mode 100644
index 9284e44..0000000
Binary files a/examples/streaming/stockcrawler/src/main/resources/stock/css/header.png and /dev/null differ

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/resources/stock/js/stock.js
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/resources/stock/js/stock.js b/examples/streaming/stockcrawler/src/main/resources/stock/js/stock.js
deleted file mode 100644
index 97f1e07..0000000
--- a/examples/streaming/stockcrawler/src/main/resources/stock/js/stock.js
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-function initChart(chartid, tableid, stockId) {
-  require.config({
-    paths: {
-      echarts: 'http://echarts.baidu.com/build/dist'
-    }
-  });
-
-  require(
-    [
-      'echarts',
-      'echarts/chart/line'
-    ],
-
-    function (ec) {
-      // \u57fa\u4e8e\u51c6\u5907\u597d\u7684dom\uff0c\u521d\u59cb\u5316echarts\u56fe\u8868
-      var myChart = ec.init(document.getElementById(chartid));
-      var dataPoints = 100;
-      var timeTicket;
-      clearInterval(timeTicket);
-      timeTicket = setInterval(function () {
-        $.getJSON("report/" + stockId, function (json) {
-          STOCK_NAME = json.name
-
-          var maxDrawnDown = json.currentMax[0].max.price - json.currentMax[0].min.price;
-          var time = new Date(json.currentMax[0].current.timestamp).toLocaleTimeString().replace(/^\D*/, '');
-          // \u52a8\u6001\u6570\u636e\u63a5\u53e3 addData
-          myChart.addData([
-            [
-              0,        // \u7cfb\u5217\u7d22\u5f15
-              maxDrawnDown.toFixed(2), // \u65b0\u589e\u6570\u636e
-              false,     // \u65b0\u589e\u6570\u636e\u662f\u5426\u4ece\u961f\u5217\u5934\u90e8\u63d2\u5165
-              false,     // \u662f\u5426\u589e\u52a0\u961f\u5217\u957f\u5ea6\uff0cfalse\u5219\u81ea\u5b9a\u5220\u9664\u539f\u6709\u6570\u636e\uff0c\u961f\u5934\u63d2\u5165\u5220\u961f\u5c3e\uff0c\u961f\u5c3e\u63d2\u5165\u5220\u961f\u5934
-              time
-            ],
-            [
-              1,        // \u7cfb\u5217\u7d22\u5f15
-              json.currentMax[0].current.price.toFixed(2), // \u65b0\u589e\u6570\u636e
-              false,     // \u65b0\u589e\u6570\u636e\u662f\u5426\u4ece\u961f\u5217\u5934\u90e8\u63d2\u5165
-              false,     // \u662f\u5426\u589e\u52a0\u961f\u5217\u957f\u5ea6\uff0cfalse\u5219\u81ea\u5b9a\u5220\u9664\u539f\u6709\u6570\u636e\uff0c\u961f\u5934\u63d2\u5165\u5220\u961f\u5c3e\uff0c\u961f\u5c3e\u63d2\u5165\u5220\u961f\u5934
-              time
-            ]
-          ]);
-          document.getElementById(chartid).style.display = "block"
-          document.getElementById(tableid).innerHTML = "<pre>" + JSON.stringify(json, null, 2) + "</pre>"
-        });
-      }, 2000);
-
-      var subtext_ = "Draw Down"
-
-      var option = {
-        title: {
-          text: 'Stock Analysis',
-          subtext: "Max " + subtext_
-        },
-        tooltip: {
-          trigger: 'axis'
-        },
-        legend: {
-          data: ["Current Price", "Current Draw Down"]
-        },
-        toolbox: {
-          show: false,
-          feature: {
-            mark: {show: true},
-            dataView: {show: true, readOnly: false},
-            magicType: {show: true, type: ['line', 'bar']},
-            restore: {show: true},
-            saveAsImage: {show: true}
-          }
-        },
-        dataZoom: {
-          show: false,
-          start: 0,
-          end: 100
-        },
-        xAxis: [
-          {
-            type: 'category',
-            boundaryGap: true,
-            data: (function () {
-              var now = new Date();
-              var res = [];
-              var len = dataPoints;
-              while (len--) {
-                res.unshift(now.toLocaleTimeString().replace(/^\D*/, ''));
-                now = new Date(now - 2000);
-              }
-              return res;
-            })()
-          }
-        ],
-        yAxis: [
-          {
-            type: 'value',
-            scale: true,
-            name: subtext_ + ' \u4ef7\u683c/\u5143',
-            boundaryGap: [0, 0.3]
-          },
-          {
-            type: 'value',
-            scale: true,
-            name: 'Current \u4ef7\u683c/\u5143',
-            boundaryGap: [0, 0.1]
-          }
-        ],
-        series: [
-          {
-            name: "Current Draw Down",
-            type: 'line',
-            data: (function () {
-              var res = [];
-              var len = dataPoints;
-              while (len--) {
-                res.push(0);
-              }
-              return res;
-            })()
-          },
-          {
-            name: "Current Price",
-            type: 'line',
-            yAxisIndex: 1,
-            data: (function () {
-              var res = [];
-              var len = dataPoints;
-              while (len--) {
-                res.push(0);
-              }
-              return res;
-            })()
-          }
-        ]
-      };
-
-      // \u4e3aecharts\u5bf9\u8c61\u52a0\u8f7d\u6570\u636e
-      myChart.setOption(option);
-    }
-  );
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/resources/stock/stock.html
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/resources/stock/stock.html b/examples/streaming/stockcrawler/src/main/resources/stock/stock.html
deleted file mode 100644
index 9682a53..0000000
--- a/examples/streaming/stockcrawler/src/main/resources/stock/stock.html
+++ /dev/null
@@ -1,87 +0,0 @@
-<!DOCTYPE html>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~      http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<html>
-
-<head>
-  <meta charset="utf-8">
-  <link rel=stylesheet type=text/css href="css/custom.css">
-  <script src="http://echarts.baidu.com/build/dist/echarts.js"></script>
-  <script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js"></script>
-  <script src="js/stock.js"></script>
-  <script type="text/javascript">
-    function search_onclick() {
-      var stockId = document.getElementById('stockId').value
-      initChart("mychart", "mytable", stockId)
-    }
-  </script>
-</head>
-
-<body style="background-color:#F2F2F2">
-<div id="container" style="width:882px; height:450px;margin-left:auto;margin-right:auto;">
-  <div style="height:0px"></div>
-  <div id="header">
-    <div
-      style="font-weight:600;position:relative;left:50px;top:50px;font-family: calibri, Arial, Helvetica, sans-serif;font-size:29px;color:white">
-      Big Data Stock Analysis Demo
-    </div>
-  </div>
-  <div id="body">
-    <div id="Menu">
-      <div style="position:relative;margin-left:30px; margin-right:20px;margin-top:20px;">
-        <!-- form to post to accompany to get accompanying cars -->
-
-        <table style="width:100%">
-          <tr>
-            <td class="sidebar-label">Stock Id:</td>
-          </tr>
-          <tr>
-            <td class="sidebar-label">Example: sh600019, sz000002</td>
-          </tr>
-          <tr>
-            <td style="vertical-align:top;">
-              <input id="stockId" class="sidebar" type="text" name="stockId"/>
-            </td>
-          </tr>
-        </table>
-        <div class="splitter"></div>
-        <div>
-          <button id="search" onclick="search_onclick()">Search</button>
-        </div>
-      </div>
-    </div>
-    <div id="content"
-         style="height:100%;width:585px;float:left;position:relative;left:20px;overflow:scroll;">
-      <div
-        style="height:50px;position:relative;top:15px;vertical-align:middle;font-weight:300;font-family: calibri, Arial, Helvetica, sans-serif;font-size:22px;color:black">
-        Analysis Result:
-      </div>
-      <div style="height:7px;background-color:#92BDF2;"></div>
-
-      <div id="mychart"></div>
-
-      <div id="mytable"></div>
-    </div>
-  </div>
-  <div id="footer">
-    Big Data Team @ Intel
-  </div>
-</div>
-</body>
-</html>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Analyzer.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Analyzer.scala b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Analyzer.scala
deleted file mode 100644
index f6fdff2..0000000
--- a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Analyzer.scala
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.examples.stock
-
-import scala.collection.immutable
-
-import akka.actor.Actor.Receive
-import org.joda.time.DateTime
-import org.joda.time.format.{DateTimeFormat, DateTimeFormatter}
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.examples.stock.Analyzer.HistoricalStates
-import org.apache.gearpump.streaming.examples.stock.Price._
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
-import org.apache.gearpump.util.LogUtil
-
-/**
- * Dradown analyzer
- * Definition: http://en.wikipedia.org/wiki/Drawdown_(economics)
- */
-class Analyzer(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
-
-  val dateFormatter = DateTimeFormat forPattern "dd/MM/yyyy"
-
-  private var stocksToReport = immutable.Set.empty[String]
-  private var stockInfos = new immutable.HashMap[String, StockPrice]
-
-  private var currentDownwardsStates = new immutable.HashMap[String, StockPriceState]
-  private val historicalStates = new HistoricalStates()
-  private var latestTimeStamp: Long = 0L
-
-  override def onStart(startTime: StartTime): Unit = {
-    LOG.info("analyzer is started")
-  }
-
-  override def onNext(msg: Message): Unit = {
-    msg.msg match {
-      case stock: StockPrice =>
-        latestTimeStamp = stock.timestamp
-        checkDate(stock)
-        stockInfos += stock.stockId -> stock
-        val downwardsState = updateCurrentStates(stock)
-        val maxDrawdown = historicalStates.updatePresentMaximal(downwardsState)
-    }
-  }
-
-  override def receiveUnManagedMessage: Receive = {
-    case get@GetReport(stockId, date) =>
-      var currentMax = currentDownwardsStates.get(stockId)
-
-      val dateTime = Option(date) match {
-        case Some(date) =>
-          currentMax = None
-          parseDate(dateFormatter, date)
-        case None =>
-          new DateTime(latestTimeStamp).withTimeAtStartOfDay
-      }
-
-      val historyMax = Option(dateTime).flatMap(handleHistoricalQuery(stockId, _))
-      val name = stockInfos.get(stockId).map(_.name).getOrElse("")
-      sender ! Report(stockId, name, dateTime.toString, historyMax, currentMax)
-  }
-
-  private def updateCurrentStates(stock: StockPrice) = {
-    var downwardsState: StockPriceState = null
-    if (currentDownwardsStates.contains(stock.stockId)) {
-      downwardsState = generateNewState(stock, currentDownwardsStates.get(stock.stockId).get)
-    } else {
-      downwardsState = StockPriceState(stock.stockId, stock, stock, stock)
-    }
-    currentDownwardsStates += stock.stockId -> downwardsState
-    downwardsState
-  }
-
-  // Update the stock's latest state.
-  private def generateNewState(currentPrice: Price, oldState: StockPriceState): StockPriceState = {
-    if (currentPrice.price > oldState.max.price) {
-      StockPriceState(oldState.stockID, currentPrice, currentPrice, currentPrice)
-    } else {
-      val newState = StockPriceState(oldState.stockID, oldState.max,
-        Price.min(currentPrice, oldState.min), currentPrice)
-      newState
-    }
-  }
-
-  private def checkDate(stock: StockPrice) = {
-    if (currentDownwardsStates.contains(stock.stockId)) {
-      val now = new DateTime(stock.timestamp)
-      val lastTime = new DateTime(currentDownwardsStates.get(stock.stockId).get.current.timestamp)
-      // New day
-      if (now.getDayOfYear > lastTime.getDayOfYear || now.getYear > lastTime.getYear) {
-        currentDownwardsStates -= stock.stockId
-      }
-    }
-  }
-
-  private def parseDate(format: DateTimeFormatter, input: String): DateTime = {
-    format.parseDateTime(input)
-  }
-
-  private def handleHistoricalQuery(stockId: String, date: DateTime) = {
-    val maximal = historicalStates.getHistoricalMaximal(stockId, date)
-    maximal
-  }
-}
-
-object Analyzer {
-
-  class HistoricalStates {
-    val LOG = LogUtil.getLogger(getClass)
-    val dateFormatter = DateTimeFormat forPattern "dd/MM/yyyy"
-    private var historicalMaxRaise = new immutable.HashMap[(String, DateTime), StockPriceState]
-    private var historicalMaxDrawdown = new immutable.HashMap[(String, DateTime), StockPriceState]
-
-    def updatePresentMaximal(newState: StockPriceState): Option[StockPriceState] = {
-      val date = Analyzer.getDateFromTimeStamp(newState.current.timestamp)
-      var newMaximalState: Option[StockPriceState] = null
-      if (newState.max.price < Float.MinPositiveValue) {
-        newMaximalState = generateNewMaximal(newState, date, historicalMaxRaise)
-        if (newMaximalState.nonEmpty) {
-          historicalMaxRaise += (newState.stockID, date) -> newMaximalState.get
-        }
-      } else {
-        newMaximalState = generateNewMaximal(newState, date, historicalMaxDrawdown)
-        if (newMaximalState.nonEmpty) {
-          historicalMaxDrawdown += (newState.stockID, date) -> newMaximalState.get
-        }
-      }
-      newMaximalState
-    }
-
-    def getHistoricalMaximal(stockId: String, date: DateTime): Option[StockPriceState] = {
-      historicalMaxDrawdown.get((stockId, date))
-    }
-
-    private def generateNewMaximal(
-        state: StockPriceState,
-        date: DateTime,
-        map: immutable.HashMap[(String, DateTime), StockPriceState])
-      : Option[StockPriceState] = {
-      val maximal = map.get((state.stockID, date))
-      if (maximal.nonEmpty && maximal.get.drawDown > state.drawDown) {
-        None
-      } else {
-        Some(state)
-      }
-    }
-  }
-
-  def getDateFromTimeStamp(timestamp: Long): DateTime = {
-    new DateTime(timestamp).withTimeAtStartOfDay()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Crawler.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Crawler.scala b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Crawler.scala
deleted file mode 100644
index bb444dd..0000000
--- a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Crawler.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.examples.stock
-
-import scala.concurrent.duration._
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
-
-class Crawler(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
-
-  import taskContext._
-
-  val FetchStockPrice = Message("FetchStockPrice")
-
-  lazy val stocks = {
-    val stockIds = conf.getValue[Array[String]]("StockId").get
-    val size = if (stockIds.length % parallelism > 0) {
-      stockIds.length / parallelism + 1
-    } else {
-      stockIds.length / parallelism
-    }
-
-    val start = taskId.index * size
-    val end = (taskId.index + 1) * size
-    stockIds.slice(start, end)
-  }
-
-  scheduleOnce(1.seconds)(self ! FetchStockPrice)
-
-  val stockMarket = conf.getValue[StockMarket](classOf[StockMarket].getName).get
-
-  override def onStart(startTime: StartTime): Unit = {
-    // Nothing
-  }
-
-  override def onNext(msg: Message): Unit = {
-    stockMarket.getPrice(stocks).foreach { price =>
-      output(new Message(price, price.timestamp))
-    }
-    scheduleOnce(5.seconds)(self ! FetchStockPrice)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Data.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Data.scala b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Data.scala
deleted file mode 100644
index 94a85ff..0000000
--- a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Data.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.examples.stock
-
-// scalastyle:off equals.hash.code  case class has equals defined
-case class StockPrice(
-    stockId: String, name: String, price: String, delta: String, pecent: String, volume: String,
-    money: String, timestamp: Long) {
-  override def hashCode: Int = stockId.hashCode
-}
-// scalastyle:on equals.hash.code  case class has equals defined
-
-case class Price(price: Float, timestamp: Long)
-
-object Price {
-
-  import scala.language.implicitConversions
-
-  implicit def StockPriceToPrice(stock: StockPrice): Price = {
-    Price(stock.price.toFloat, stock.timestamp)
-  }
-
-  def min(first: Price, second: Price): Price = {
-    if (first.price < second.price) {
-      first
-    } else {
-      second
-    }
-  }
-}
-
-case class StockPriceState(stockID: String, max: Price, min: Price, current: Price) {
-
-  def drawDownPeriod: Long = min.timestamp - max.timestamp
-
-  def recoveryPeriod: Long = current.timestamp - min.timestamp
-
-  def drawDown: Float = max.price - min.price
-}
-
-case class GetReport(stockId: String, date: String)
-
-case class Report(
-    stockId: String, name: String, date: String, historyMax: Option[StockPriceState],
-    currentMax: Option[StockPriceState])

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/QueryServer.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/QueryServer.scala b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/QueryServer.scala
deleted file mode 100644
index 01ccb3e..0000000
--- a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/QueryServer.scala
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.examples.stock
-
-import java.util.concurrent.TimeUnit
-import scala.concurrent.{ExecutionContext, Future}
-import scala.util.{Failure, Success}
-
-import akka.actor.Actor._
-import akka.actor.{Actor, ActorRefFactory, Props}
-import akka.io.IO
-import akka.pattern.ask
-import spray.can.Http
-import spray.http.StatusCodes
-import spray.json._
-import spray.routing.{HttpService, Route}
-import upickle.default.write
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterDataDetailRequest
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.ProcessorId
-import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef}
-import org.apache.gearpump.streaming.appmaster.{ProcessorSummary, StreamAppMasterSummary}
-import org.apache.gearpump.streaming.examples.stock.QueryServer.WebServer
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
-
-class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
-  import scala.concurrent.ExecutionContext.Implicits.global
-
-  import taskContext.{appId, appMaster}
-
-  var analyzer: (ProcessorId, ProcessorSummary) = null
-  implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
-
-  override def onStart(startTime: StartTime): Unit = {
-    appMaster ! AppMasterDataDetailRequest(appId)
-    taskContext.actorOf(Props(new WebServer))
-  }
-
-  override def onNext(msg: Message): Unit = {
-    // Skip
-  }
-
-  override def receiveUnManagedMessage: Receive = messageHandler
-
-  def messageHandler: Receive = {
-    case detail: StreamAppMasterSummary =>
-      analyzer = detail.processors.find { kv =>
-        val (processorId, processor) = kv
-        processor.taskClass == classOf[Analyzer].getName
-      }.get
-    case getReport@GetReport(stockId, date) =>
-      val parallism = analyzer._2.parallelism
-      val processorId = analyzer._1
-      val analyzerTaskId = TaskId(processorId, (stockId.hashCode & Integer.MAX_VALUE) % parallism)
-      val requester = sender
-      import scala.concurrent.Future
-      (appMaster ? LookupTaskActorRef(analyzerTaskId))
-        .asInstanceOf[Future[TaskActorRef]].flatMap { task =>
-
-        (task.task ? getReport).asInstanceOf[Future[Report]]
-      }.map { report =>
-        LOG.info(s"reporting $report")
-        requester ! report
-      }
-    case _ =>
-    // Ignore
-  }
-}
-
-object QueryServer {
-  class WebServer extends Actor with HttpService {
-
-    import context.dispatcher
-    implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
-    def actorRefFactory: ActorRefFactory = context
-    implicit val system = context.system
-
-    IO(Http) ! Http.Bind(self, interface = "localhost", port = 8080)
-
-    override def receive: Receive = runRoute(webServer ~ staticRoute)
-
-    def webServer: Route = {
-      path("report" / Segment) { stockId =>
-        get {
-          onComplete((context.parent ? GetReport(stockId, null)).asInstanceOf[Future[Report]]) {
-            case Success(report: Report) =>
-              val json = write(report)
-              complete(pretty(json))
-            case Failure(ex) => complete(StatusCodes.InternalServerError,
-              s"An error occurred: ${ex.getMessage}")
-          }
-        }
-      }
-    }
-
-    val staticRoute = {
-      pathEndOrSingleSlash {
-        getFromResource("stock/stock.html")
-      } ~
-      pathPrefix("css") {
-        get {
-          getFromResourceDirectory("stock/css")
-        }
-      } ~
-      pathPrefix("js") {
-        get {
-          getFromResourceDirectory("stock/js")
-        }
-      }
-    }
-
-    private def pretty(json: String): String = {
-      json.parseJson.prettyPrint
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/StockMarket.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/StockMarket.scala b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/StockMarket.scala
deleted file mode 100644
index 24e050b..0000000
--- a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/StockMarket.scala
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.examples.stock
-
-import java.nio.charset.Charset
-import scala.io.Codec
-
-import org.apache.commons.httpclient.methods.GetMethod
-import org.apache.commons.httpclient.{HttpClient, MultiThreadedHttpConnectionManager}
-import org.htmlcleaner.{HtmlCleaner, TagNode}
-import org.joda.time.{DateTime, DateTimeZone}
-
-import org.apache.gearpump.streaming.examples.stock.StockMarket.ServiceHour
-import org.apache.gearpump.transport.HostPort
-import org.apache.gearpump.util.LogUtil
-
-class StockMarket(service: ServiceHour, proxy: HostPort = null) extends Serializable {
-
-  private def LOG = LogUtil.getLogger(getClass)
-
-  @transient
-  private var connectionManager: MultiThreadedHttpConnectionManager = null
-
-  private val eastMoneyStockPage = "http://quote.eastmoney.com/stocklist.html"
-
-  private val stockPriceParser =
-    """^var\shq_str_s_([a-z0-9A-Z]+)="([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+)";$""".r
-
-  def shutdown(): Unit = {
-    Option(connectionManager).map(_.shutdown())
-  }
-
-  @transient
-  private var _client: HttpClient = null
-
-  private def client: HttpClient = {
-    _client = Option(_client).getOrElse {
-      val connectionManager = new MultiThreadedHttpConnectionManager()
-      val client = new HttpClient(connectionManager)
-      Option(proxy).map(host => client.getHostConfiguration().setProxy(host.host, host.port))
-      client
-    }
-    _client
-  }
-
-  def getPrice(stocks: Array[String]): Array[StockPrice] = {
-
-    LOG.info(s"getPrice 1")
-
-    val query = "http://hq.sinajs.cn/list=" + stocks.map("s_" + _).mkString(",")
-    if (service.inService) {
-
-      LOG.info(s"getPrice 2")
-
-      val get = new GetMethod(query)
-      client.executeMethod(get)
-      val current = System.currentTimeMillis()
-
-      val output = scala.io.Source.fromInputStream(get.getResponseBodyAsStream)(
-        new Codec(Charset forName "GBK")).getLines().flatMap { line =>
-        line match {
-          case stockPriceParser(stockId, name, price, delta, pecent, volume, money) =>
-            Some(StockPrice(stockId, name, price, delta, pecent, volume, money, current))
-          case _ =>
-            None
-        }
-      }.toArray
-
-      LOG.info(s"getPrice 3 ${output.length}")
-
-      output
-    } else {
-      Array.empty[StockPrice]
-    }
-  }
-
-  private val urlPattern = """^.*/([a-zA-Z0-9]+)\.html$""".r
-
-  def getStockIdList: Array[String] = {
-    val cleaner = new HtmlCleaner
-    val props = cleaner.getProperties
-
-    val get = new GetMethod(eastMoneyStockPage)
-    client.executeMethod(get)
-
-    val root = cleaner.clean(get.getResponseBodyAsStream)
-
-    val stockUrls = root.evaluateXPath("//div[@id='quotesearch']//li//a[@href]")
-
-    val elements = root.getElementsByName("a", true)
-
-    val hrefs = (0 until stockUrls.length)
-      .map(stockUrls(_).asInstanceOf[TagNode].getAttributeByName("href"))
-      .map { url =>
-        url match {
-          case urlPattern(code) => code
-          case _ => null
-        }
-      }.toArray
-    hrefs
-  }
-}
-
-object StockMarket {
-
-  class ServiceHour(all: Boolean) extends Serializable {
-
-    /**
-     * Morning openning: 9:30 am - 11:30 am
-     */
-    val morningStart = GMT8(new DateTime(0, 1, 1, 9, 30)).getMillis
-    val morningEnd = GMT8(new DateTime(0, 1, 1, 11, 30)).getMillis
-
-    /**
-     * After noon openning: 13:00 pm - 15:00 pm
-     */
-    val afternoonStart = GMT8(new DateTime(0, 1, 1, 13, 0)).getMillis
-    val afternoonEnd = GMT8(new DateTime(0, 1, 1, 15, 0)).getMillis
-
-    def inService: Boolean = {
-
-      if (all) {
-        true
-      } else {
-        val now = GMT8(DateTime.now()).withDate(0, 1, 1).getMillis
-        if (now >= morningStart && now <= morningEnd ||
-          now >= afternoonStart && now <= afternoonEnd) {
-          true
-        } else {
-          false
-        }
-      }
-    }
-
-    private def GMT8(time: DateTime): DateTime = {
-      time.withZone(DateTimeZone.UTC).plusHours(8)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/main/Stock.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/main/Stock.scala b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/main/Stock.scala
deleted file mode 100644
index 6d17c20..0000000
--- a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/main/Stock.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.examples.stock.main
-
-import akka.actor.ActorSystem
-import org.slf4j.Logger
-
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
-import org.apache.gearpump.partitioner.HashPartitioner
-import org.apache.gearpump.streaming.examples.stock.StockMarket.ServiceHour
-import org.apache.gearpump.streaming.examples.stock.{Analyzer, Crawler, QueryServer, StockMarket}
-import org.apache.gearpump.streaming.{Processor, StreamApplication}
-import org.apache.gearpump.transport.HostPort
-import org.apache.gearpump.util.Graph.Node
-import org.apache.gearpump.util.{AkkaApp, Graph, LogUtil}
-
-/** Tracks the China's stock market index change */
-object Stock extends AkkaApp with ArgumentsParser {
-
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    "crawler" -> CLIOption[Int]("<how many fetcher to get data from remote>",
-      required = false, defaultValue = Some(10)),
-    "analyzer" -> CLIOption[Int]("<parallism of analyzer>",
-      required = false, defaultValue = Some(1)),
-    "proxy" -> CLIOption[String]("proxy setting host:port, for example: 127.0.0.1:8443",
-      required = false, defaultValue = Some("")))
-
-  def crawler(config: ParseResult)(implicit system: ActorSystem): StreamApplication = {
-    val crawler = Processor[Crawler](config.getInt("crawler"))
-    val analyzer = Processor[Analyzer](config.getInt("analyzer"))
-    val queryServer = Processor[QueryServer](1)
-
-    val proxySetting = config.getString("proxy")
-    val proxy = if (proxySetting.isEmpty) {
-      null
-    } else HostPort(proxySetting)
-    val stockMarket = new StockMarket(new ServiceHour(true), proxy)
-    val stocks = stockMarket.getStockIdList
-
-    // scalastyle:off println
-    Console.println(s"Successfully fetched stock id for ${stocks.length} stocks")
-    // scalastyle:on println
-
-    val userConfig = UserConfig.empty.withValue("StockId", stocks)
-      .withValue[StockMarket](classOf[StockMarket].getName, stockMarket)
-    val partitioner = new HashPartitioner
-
-    val p1 = crawler ~ partitioner ~> analyzer
-    val p2 = Node(queryServer)
-    val graph = Graph(p1, p2)
-    val app = StreamApplication("stock_direct_analyzer", graph, userConfig
-    )
-    app
-  }
-
-  override def main(akkaConf: Config, args: Array[String]): Unit = {
-    val config = parse(args)
-    val context = ClientContext(akkaConf)
-
-    implicit val system = context.system
-
-    val app = crawler(config)
-    val appId = context.submit(app)
-    context.close()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/README.md
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/README.md b/examples/streaming/transport/README.md
deleted file mode 100644
index fc9bdfe..0000000
--- a/examples/streaming/transport/README.md
+++ /dev/null
@@ -1,3 +0,0 @@
-What is this?
-=============
-A smart transportation example which simulate a city with millions of cars.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/geardefault.conf
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/geardefault.conf b/examples/streaming/transport/src/main/resources/geardefault.conf
deleted file mode 100644
index 0c8f421..0000000
--- a/examples/streaming/transport/src/main/resources/geardefault.conf
+++ /dev/null
@@ -1,12 +0,0 @@
-gearpump {
-
-  serializers {
-    ## Follow this format when adding new serializer for new message types
-    ##    "org.apache.gearpump.Message" = "org.apache.gearpump.streaming.MessageSerializer"
-    "org.apache.gearpump.streaming.examples.transport.PassRecord" = ""
-  }
-}
-
-spray.can {
-  server.parsing.max-content-length = "10M"
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/transport/css/body.png
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/transport/css/body.png b/examples/streaming/transport/src/main/resources/transport/css/body.png
deleted file mode 100644
index b5c536c..0000000
Binary files a/examples/streaming/transport/src/main/resources/transport/css/body.png and /dev/null differ

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/transport/css/custom.css
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/transport/css/custom.css b/examples/streaming/transport/src/main/resources/transport/css/custom.css
deleted file mode 100644
index f324b6a..0000000
--- a/examples/streaming/transport/src/main/resources/transport/css/custom.css
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-.ui-datepicker {
-  font-size: 11px;
-}
-
-.sidebar-label {
-  font-size: 15px;
-  font-family: calibri, Arial, Helvetica, sans-serif;
-}
-
-.help {
-  font-size: 12px;
-  font-family: calibri, Arial, Helvetica, sans-serif;
-}
-
-div.splitter {
-  margin: 12px 0px 7px 0px;
-  clear: both;
-  border-top: 1px solid #EBEBEB;
-}
-
-input.sidebar {
-  width: 165px
-}
-
-select.sidebar {
-  width: 198px
-}
-
-table.dataintable {
-  font-family: calibri, Arial, Helvetica, sans-serif;
-  font-size: 15px;
-  margin-top: 10px;
-  border-collapse: collapse;
-  border: 1px solid #888;
-}
-
-table.dataintable th {
-  vertical-align: baseline;
-  padding: 5px 15px 5px 5px;
-  background-color: #EEE;
-  border: 1px solid #888;
-  text-align: left;
-}
-
-table.dataintable td {
-  vertical-align: text-top;
-  padding: 5px 15px 5px 5px;
-  background-color: #FFFFFF;
-  border: 1px solid #AAA;
-}
-
-#search {
-  width: 100px;
-  height: 25px;
-  position: relative;
-  left: 0px;
-  top: 5px;
-}
-
-#mytable {
-  width: 100%;
-  height: 300;
-  float: left;
-}
-
-#mychart {
-  height: 400px;
-  width: 100%;
-}
-
-#Menu {
-  height: 100%;
-  width: 245px;
-  float: left;
-}
-
-#header {
-  height: 115px;
-  background-image: url(header.png);
-}
-
-#body {
-  height: 100%;
-  width: 100%;
-  background-image: url(body.png);
-  background-size: 100% 100%;
-}
-
-#footer {
-  color: white;
-  height: 70px;
-  line-height: 70px;
-  text-align: middle;
-  clear: both;
-  text-align: center;
-  background-image: url(foot.png);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/transport/css/foot.png
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/transport/css/foot.png b/examples/streaming/transport/src/main/resources/transport/css/foot.png
deleted file mode 100644
index 5db91b5..0000000
Binary files a/examples/streaming/transport/src/main/resources/transport/css/foot.png and /dev/null differ

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/transport/css/header.png
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/transport/css/header.png b/examples/streaming/transport/src/main/resources/transport/css/header.png
deleted file mode 100644
index 9284e44..0000000
Binary files a/examples/streaming/transport/src/main/resources/transport/css/header.png and /dev/null differ