You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/08/15 15:04:34 UTC
[3/3] incubator-gearpump git commit: [GEARPUMP-188] use
java.time.Instant for Task start time
[GEARPUMP-188] use java.time.Instant for Task start time
Author: manuzhang <ow...@gmail.com>
Closes #74 from manuzhang/replace_timestamp_with_instant.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/23daf0cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/23daf0cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/23daf0cf
Branch: refs/heads/master
Commit: 23daf0cf9c1db3fabc1b679993fcf1d6edb43e7d
Parents: 6d919ec
Author: manuzhang <ow...@gmail.com>
Authored: Mon Aug 15 23:04:11 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Mon Aug 15 23:04:11 2016 +0800
----------------------------------------------------------------------
.../pagerank/PageRankController.scala | 4 +-
.../streaming/examples/complexdag/Node.scala | 6 +-
.../streaming/examples/complexdag/Sink.scala | 6 +-
.../streaming/examples/complexdag/Source.scala | 6 +-
.../examples/fsio/SeqFileStreamProcessor.scala | 8 +-
.../examples/fsio/SeqFileStreamProducer.scala | 9 +-
.../fsio/SeqFileStreamProcessorSpec.scala | 5 +-
.../fsio/SeqFileStreamProducerSpec.scala | 5 +-
.../examples/kafka/wordcount/Split.scala | 7 +-
.../examples/kafka/wordcount/Sum.scala | 7 +-
.../examples/kafka/wordcount/SumSpec.scala | 5 +-
.../examples/sol/SOLStreamProcessor.scala | 5 +-
.../examples/sol/SOLStreamProducer.scala | 5 +-
.../examples/sol/SOLStreamProcessorSpec.scala | 5 +-
.../examples/sol/SOLStreamProducerSpec.scala | 5 +-
.../processor/NumberGeneratorProcessor.scala | 8 +-
.../state/processor/CountProcessorSpec.scala | 6 +-
.../NumberGeneratorProcessorSpec.scala | 5 +-
.../processor/WindowAverageProcessorSpec.scala | 6 +-
examples/streaming/stockcrawler/README.md | 19 --
.../src/main/resources/geardefault.conf | 9 -
.../src/main/resources/stock/css/body.png | Bin 201 -> 0 bytes
.../src/main/resources/stock/css/custom.css | 115 -----------
.../src/main/resources/stock/css/foot.png | Bin 2250 -> 0 bytes
.../src/main/resources/stock/css/header.png | Bin 17350 -> 0 bytes
.../src/main/resources/stock/js/stock.js | 157 ---------------
.../src/main/resources/stock/stock.html | 87 --------
.../streaming/examples/stock/Analyzer.scala | 170 ----------------
.../streaming/examples/stock/Crawler.scala | 60 ------
.../streaming/examples/stock/Data.scala | 61 ------
.../streaming/examples/stock/QueryServer.scala | 134 -------------
.../streaming/examples/stock/StockMarket.scala | 155 ---------------
.../streaming/examples/stock/main/Stock.scala | 86 --------
examples/streaming/transport/README.md | 3 -
.../src/main/resources/geardefault.conf | 12 --
.../src/main/resources/transport/css/body.png | Bin 201 -> 0 bytes
.../src/main/resources/transport/css/custom.css | 115 -----------
.../src/main/resources/transport/css/foot.png | Bin 2250 -> 0 bytes
.../src/main/resources/transport/css/header.png | Bin 17350 -> 0 bytes
.../main/resources/transport/js/transport.js | 180 -----------------
.../main/resources/transport/svg/beijing.svg | 199 -------------------
.../src/main/resources/transport/transport.html | 88 --------
.../streaming/examples/transport/Data.scala | 32 ---
.../examples/transport/DataSource.scala | 56 ------
.../examples/transport/QueryServer.scala | 154 --------------
.../examples/transport/Transport.scala | 69 -------
.../examples/transport/VelocityInspector.scala | 123 ------------
.../examples/transport/generator/MockCity.scala | 88 --------
.../generator/PassRecordGenerator.scala | 69 -------
.../examples/transport/DataSourceSpec.scala | 45 -----
.../examples/transport/TransportSpec.scala | 69 -------
.../transport/generator/MockCitySpec.scala | 31 ---
.../generator/PassRecordGeneratorSpec.scala | 34 ----
.../streaming/examples/wordcountjava/Split.java | 5 +-
.../streaming/examples/wordcountjava/Sum.java | 4 +-
.../streaming/examples/wordcount/Split.scala | 5 +-
.../streaming/examples/wordcount/Sum.scala | 5 +-
.../examples/wordcount/SplitSpec.scala | 7 +-
.../streaming/examples/wordcount/SumSpec.scala | 5 +-
.../storm/processor/StormProcessor.scala | 3 +-
.../storm/producer/StormProducer.scala | 3 +-
.../storm/topology/GearpumpStormComponent.scala | 9 +-
.../storm/processor/StormProcessorSpec.scala | 5 +-
.../storm/producer/StormProducerSpec.scala | 5 +-
.../topology/GearpumpStormComponentSpec.scala | 9 +-
.../topology/GearpumpStormTopologySpec.scala | 1 -
.../kafka/lib/source/AbstractKafkaSource.scala | 5 +-
.../streaming/kafka/KafkaSourceSpec.scala | 10 +-
.../apache/gearpump/streaming/javaapi/Task.java | 5 +-
.../gearpump/streaming/dsl/StreamApp.scala | 32 +--
.../streaming/dsl/plan/OpTranslator.scala | 24 +--
.../gearpump/streaming/sink/DataSinkTask.scala | 7 +-
.../gearpump/streaming/source/DataSource.scala | 6 +-
.../streaming/source/DataSourceTask.scala | 8 +-
.../streaming/state/api/PersistentTask.scala | 8 +-
.../gearpump/streaming/task/StartTime.scala | 24 ---
.../apache/gearpump/streaming/task/Task.scala | 6 +-
.../gearpump/streaming/task/TaskActor.scala | 7 +-
.../gearpump/streaming/task/TaskWrapper.scala | 12 +-
.../streaming/appmaster/AppMasterSpec.scala | 10 +-
.../streaming/appmaster/TaskManagerSpec.scala | 4 +-
.../streaming/appmaster/TaskSchedulerSpec.scala | 11 +-
.../gearpump/streaming/dsl/StreamSpec.scala | 4 +-
.../streaming/dsl/plan/OpTranslatorSpec.scala | 12 +-
.../streaming/sink/DataSinkTaskSpec.scala | 8 +-
.../streaming/source/DataSourceTaskSpec.scala | 7 +-
.../streaming/task/SubscriptionSpec.scala | 7 +-
87 files changed, 199 insertions(+), 2617 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankController.scala
----------------------------------------------------------------------
diff --git a/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankController.scala b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankController.scala
index d461876..aa250da 100644
--- a/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankController.scala
+++ b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankController.scala
@@ -17,6 +17,8 @@
*/
package org.apache.gearpump.experiments.pagerank
+import java.time.Instant
+
import akka.actor.Actor.Receive
import org.apache.gearpump.cluster.UserConfig
@@ -39,7 +41,7 @@ class PageRankController(taskContext: TaskContext, conf: UserConfig)
var weights = Map.empty[TaskId, Double]
var deltas = Map.empty[TaskId, Double]
- override def onStart(startTime: StartTime): Unit = {
+ override def onStart(startTime: Instant): Unit = {
output(Tick(tick), tasks: _*)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala
index 8d163f9..ddd4d1a 100644
--- a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala
+++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala
@@ -18,14 +18,16 @@
package org.apache.gearpump.streaming.examples.complexdag
+import java.time.Instant
+
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
class Node(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
import taskContext.output
- override def onStart(startTime: StartTime): Unit = {}
+ override def onStart(startTime: Instant): Unit = {}
override def onNext(msg: Message): Unit = {
val list = msg.msg.asInstanceOf[Vector[String]]
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala
index 8dfa565..e9b00a0 100644
--- a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala
+++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala
@@ -18,9 +18,11 @@
package org.apache.gearpump.streaming.examples.complexdag
+import java.time.Instant
+
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
import scala.collection.mutable
@@ -28,7 +30,7 @@ class Sink(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext,
var list = mutable.MutableList[String]()
- override def onStart(startTime: StartTime): Unit = {
+ override def onStart(startTime: Instant): Unit = {
list += getClass.getCanonicalName
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala
index 0359519..7abb3fc 100644
--- a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala
+++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala
@@ -18,14 +18,16 @@
package org.apache.gearpump.streaming.examples.complexdag
+import java.time.Instant
+
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
class Source(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
import taskContext.output
- override def onStart(startTime: StartTime): Unit = {
+ override def onStart(startTime: Instant): Unit = {
self ! Message("start")
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
index 2e4a556..561346e 100644
--- a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
+++ b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
@@ -18,19 +18,19 @@
package org.apache.gearpump.streaming.examples.fsio
import java.io.File
+import java.time.Instant
import java.util.concurrent.TimeUnit
-import scala.concurrent.duration.FiniteDuration
+import scala.concurrent.duration.FiniteDuration
import akka.actor.Cancellable
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.SequenceFile._
import org.apache.hadoop.io.{SequenceFile, Text}
-
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.examples.fsio.HadoopConfig._
import org.apache.gearpump.streaming.examples.fsio.SeqFileStreamProcessor._
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
class SeqFileStreamProcessor(taskContext: TaskContext, config: UserConfig)
extends Task(taskContext, config) {
@@ -49,7 +49,7 @@ class SeqFileStreamProcessor(taskContext: TaskContext, config: UserConfig)
private var snapShotTime: Long = 0
private var scheduler: Cancellable = null
- override def onStart(startTime: StartTime): Unit = {
+ override def onStart(startTime: Instant): Unit = {
val fs = FileSystem.get(hadoopConf)
fs.deleteOnExit(outputPath)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
index 02d2434..4106a2c 100644
--- a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
+++ b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
@@ -17,15 +17,16 @@
*/
package org.apache.gearpump.streaming.examples.fsio
+import java.time.Instant
+
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.SequenceFile._
import org.apache.hadoop.io.{SequenceFile, Text}
-
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.examples.fsio.HadoopConfig._
import org.apache.gearpump.streaming.examples.fsio.SeqFileStreamProducer._
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
class SeqFileStreamProducer(taskContext: TaskContext, config: UserConfig)
extends Task(taskContext, config) {
@@ -34,12 +35,12 @@ class SeqFileStreamProducer(taskContext: TaskContext, config: UserConfig)
val value = new Text()
val key = new Text()
- var reader: SequenceFile.Reader = null
+ var reader: SequenceFile.Reader = _
val hadoopConf = config.hadoopConf
val fs = FileSystem.get(hadoopConf)
val inputPath = new Path(config.getString(INPUT_PATH).get)
- override def onStart(startTime: StartTime): Unit = {
+ override def onStart(startTime: Instant): Unit = {
reader = new SequenceFile.Reader(hadoopConf, Reader.file(inputPath))
self ! Start
LOG.info("sequence file spout initiated")
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala
index 7831b14..2edb87f 100644
--- a/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala
+++ b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala
@@ -18,6 +18,7 @@
package org.apache.gearpump.streaming.examples.fsio
import java.io.File
+import java.time.Instant
import scala.collection.mutable.ArrayBuffer
import akka.actor.ActorSystem
@@ -33,7 +34,7 @@ import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.{TestUtil, UserConfig}
-import org.apache.gearpump.streaming.task.{StartTime, TaskId}
+import org.apache.gearpump.streaming.task.TaskId
import org.apache.gearpump.streaming.{MockUtil, Processor}
class SeqFileStreamProcessorSpec
extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter {
@@ -67,7 +68,7 @@ class SeqFileStreamProcessorSpec
when(context.taskId).thenReturn(taskId)
val processor = new SeqFileStreamProcessor(context, conf)
- processor.onStart(StartTime(0))
+ processor.onStart(Instant.EPOCH)
forAll(kvGenerator) { kv =>
val (key, value) = kv
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
index ad27e63..a03e68d 100644
--- a/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
+++ b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
@@ -17,6 +17,8 @@
*/
package org.apache.gearpump.streaming.examples.fsio
+import java.time.Instant
+
import scala.collection.mutable.ArrayBuffer
import org.apache.hadoop.conf.Configuration
@@ -32,7 +34,6 @@ import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.MockUtil
import org.apache.gearpump.streaming.MockUtil._
-import org.apache.gearpump.streaming.task.StartTime
class SeqFileStreamProducerSpec
extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter {
@@ -73,7 +74,7 @@ class SeqFileStreamProducerSpec
val context = MockUtil.mockTaskContext
val producer = new SeqFileStreamProducer(context, conf)
- producer.onStart(StartTime(0))
+ producer.onStart(Instant.EPOCH)
producer.onNext(Message("start"))
val expected = kvPairs.map(kv => kv._1 + "++" + kv._2).toSet
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala
index a95f596..b78e788 100644
--- a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala
+++ b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala
@@ -18,16 +18,17 @@
package org.apache.gearpump.streaming.examples.kafka.wordcount
-import com.twitter.bijection.Injection
+import java.time.Instant
+import com.twitter.bijection.Injection
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
class Split(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
import taskContext.output
- override def onStart(startTime: StartTime): Unit = {
+ override def onStart(startTime: Instant): Unit = {
}
override def onNext(msg: Message): Unit = {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala
index 9930b92..58bb884 100644
--- a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala
+++ b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala
@@ -18,18 +18,19 @@
package org.apache.gearpump.streaming.examples.kafka.wordcount
-import com.twitter.bijection.Injection
+import java.time.Instant
+import com.twitter.bijection.Injection
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
class Sum(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
import taskContext.output
private[wordcount] var wordcount = Map.empty[String, Long]
- override def onStart(startTime: StartTime): Unit = {}
+ override def onStart(startTime: Instant): Unit = {}
override def onNext(message: Message): Unit = {
val word = message.msg.asInstanceOf[String]
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala b/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala
index 3538ece..e37118a 100644
--- a/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala
+++ b/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala
@@ -17,6 +17,8 @@
*/
package org.apache.gearpump.streaming.examples.kafka.wordcount
+import java.time.Instant
+
import scala.collection.mutable
import org.mockito.Matchers._
@@ -27,7 +29,6 @@ import org.scalatest.{FlatSpec, Matchers}
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
class SumSpec extends FlatSpec with Matchers {
@@ -39,7 +40,7 @@ class SumSpec extends FlatSpec with Matchers {
val taskContext = MockUtil.mockTaskContext
val sum = new Sum(taskContext, UserConfig.empty)
- sum.onStart(StartTime(0))
+ sum.onStart(Instant.EPOCH)
val str = "once two two three three three"
var totalWordCount = 0
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
index 796b0d2..a16cf4c 100644
--- a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
+++ b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
@@ -18,6 +18,7 @@
package org.apache.gearpump.streaming.examples.sol
+import java.time.Instant
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.FiniteDuration
@@ -25,7 +26,7 @@ import akka.actor.Cancellable
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
class SOLStreamProcessor(taskContext: TaskContext, conf: UserConfig)
extends Task(taskContext, conf) {
@@ -38,7 +39,7 @@ class SOLStreamProcessor(taskContext: TaskContext, conf: UserConfig)
private var snapShotWordCount: Long = 0
private var snapShotTime: Long = 0
- override def onStart(startTime: StartTime): Unit = {
+ override def onStart(startTime: Instant): Unit = {
scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS),
new FiniteDuration(5, TimeUnit.SECONDS))(reportWordCount())
snapShotTime = System.currentTimeMillis()
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala
index 84ed038..c1b11e5 100644
--- a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala
+++ b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala
@@ -18,12 +18,13 @@
package org.apache.gearpump.streaming.examples.sol
+import java.time.Instant
import java.util.Random
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.examples.sol.SOLStreamProducer._
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
class SOLStreamProducer(taskContext: TaskContext, conf: UserConfig)
extends Task(taskContext, conf) {
@@ -36,7 +37,7 @@ class SOLStreamProducer(taskContext: TaskContext, conf: UserConfig)
private var rand: Random = null
private var messageCount: Long = 0
- override def onStart(startTime: StartTime): Unit = {
+ override def onStart(startTime: Instant): Unit = {
prepareRandomMessage
self ! Start
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala b/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala
index a6cc966..e3344bf 100644
--- a/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala
+++ b/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala
@@ -17,6 +17,8 @@
*/
package org.apache.gearpump.streaming.examples.sol
+import java.time.Instant
+
import org.mockito.Mockito._
import org.scalacheck.Gen
import org.scalatest.{FlatSpec, Matchers}
@@ -24,7 +26,6 @@ import org.scalatest.{FlatSpec, Matchers}
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
class SOLStreamProcessorSpec extends FlatSpec with Matchers {
@@ -33,7 +34,7 @@ class SOLStreamProcessorSpec extends FlatSpec with Matchers {
val context = MockUtil.mockTaskContext
val sol = new SOLStreamProcessor(context, UserConfig.empty)
- sol.onStart(StartTime(0))
+ sol.onStart(Instant.EPOCH)
val msg = Message("msg")
sol.onNext(msg)
verify(context, times(1)).output(msg)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala b/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala
index 2316de8..dc21171 100644
--- a/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala
+++ b/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala
@@ -17,6 +17,8 @@
*/
package org.apache.gearpump.streaming.examples.sol
+import java.time.Instant
+
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.scalatest.{Matchers, WordSpec}
@@ -24,7 +26,6 @@ import org.scalatest.{Matchers, WordSpec}
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
class SOLStreamProducerSpec extends WordSpec with Matchers {
@@ -35,7 +36,7 @@ class SOLStreamProducerSpec extends WordSpec with Matchers {
val context = MockUtil.mockTaskContext
val producer = new SOLStreamProducer(context, conf)
- producer.onStart(StartTime(0))
+ producer.onStart(Instant.EPOCH)
producer.onNext(Message("msg"))
verify(context).output(any[Message])
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
index 0e85f32..134afba 100644
--- a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
+++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
@@ -18,17 +18,19 @@
package org.apache.gearpump.streaming.examples.state.processor
+import java.time.Instant
+
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
class NumberGeneratorProcessor(taskContext: TaskContext, conf: UserConfig)
extends Task(taskContext, conf) {
import taskContext.output
private var num = 0L
- override def onStart(startTime: StartTime): Unit = {
- num = startTime.startTime
+ override def onStart(startTime: Instant): Unit = {
+ num = startTime.toEpochMilli
self ! Message("start")
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
index 6048034..b95d164 100644
--- a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
+++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
@@ -18,6 +18,8 @@
package org.apache.gearpump.streaming.examples.state.processor
+import java.time.Instant
+
import scala.concurrent.Await
import scala.concurrent.duration._
@@ -33,7 +35,7 @@ import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.MockUtil
import org.apache.gearpump.streaming.state.api.PersistentTask
import org.apache.gearpump.streaming.state.impl.{InMemoryCheckpointStoreFactory, PersistentStateConfig}
-import org.apache.gearpump.streaming.task.{UpdateCheckpointClock, StartTime}
+import org.apache.gearpump.streaming.task.UpdateCheckpointClock
import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
class CountProcessorSpec extends PropSpec with PropertyChecks with Matchers {
@@ -59,7 +61,7 @@ class CountProcessorSpec extends PropSpec with PropertyChecks with Matchers {
val appMaster = TestProbe()(system)
when(taskContext.appMaster).thenReturn(appMaster.ref)
- count.onStart(StartTime(0L))
+ count.onStart(Instant.EPOCH)
appMaster.expectMsg(UpdateCheckpointClock(taskContext.taskId, 0L))
for (i <- 0L to num) {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala
index 2268994..d3f645c 100644
--- a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala
+++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala
@@ -18,6 +18,8 @@
package org.apache.gearpump.streaming.examples.state.processor
+import java.time.Instant
+
import scala.concurrent.Await
import scala.concurrent.duration.Duration
@@ -30,7 +32,6 @@ import org.scalatest.{Matchers, WordSpec}
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
class NumberGeneratorProcessorSpec extends WordSpec with Matchers {
"NumberGeneratorProcessor" should {
@@ -47,7 +48,7 @@ class NumberGeneratorProcessorSpec extends WordSpec with Matchers {
val conf = UserConfig.empty
val genNum = new NumberGeneratorProcessor(taskContext, conf)
- genNum.onStart(StartTime(0))
+ genNum.onStart(Instant.EPOCH)
mockTaskActor.expectMsgType[Message]
genNum.onNext(Message("next"))
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
index 0963429..255f869 100644
--- a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
+++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
@@ -18,6 +18,8 @@
package org.apache.gearpump.streaming.examples.state.processor
+import java.time.Instant
+
import scala.concurrent.Await
import scala.concurrent.duration._
@@ -34,7 +36,7 @@ import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.MockUtil
import org.apache.gearpump.streaming.state.api.PersistentTask
import org.apache.gearpump.streaming.state.impl.{InMemoryCheckpointStoreFactory, PersistentStateConfig, WindowConfig}
-import org.apache.gearpump.streaming.task.{UpdateCheckpointClock, StartTime}
+import org.apache.gearpump.streaming.task.UpdateCheckpointClock
import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
class WindowAverageProcessorSpec extends PropSpec with PropertyChecks with Matchers {
@@ -61,7 +63,7 @@ class WindowAverageProcessorSpec extends PropSpec with PropertyChecks with Match
val appMaster = TestProbe()(system)
when(taskContext.appMaster).thenReturn(appMaster.ref)
- windowAverage.onStart(StartTime(0L))
+ windowAverage.onStart(Instant.EPOCH)
appMaster.expectMsg(UpdateCheckpointClock(taskContext.taskId, 0L))
for (i <- 0L until num) {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/README.md
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/README.md b/examples/streaming/stockcrawler/README.md
deleted file mode 100644
index b51590f..0000000
--- a/examples/streaming/stockcrawler/README.md
+++ /dev/null
@@ -1,19 +0,0 @@
-How to use
-===================
-1. Start local cluster,
- ```
- bin/local
- ```
-2. Submit the stock crawler
- ```
- bin\gear app -jar examples\gearpump-examples-assembly-0.3.2-SNAPSHOT.jar org.apache.gearpump.streaming.examples.stock.main.Stock
- ```
-
- If you are behind a proxy, you need to set the proxy address
- ```
- bin\gear app -jar examples\gearpump-examples-assembly-0.3.2-SNAPSHOT.jar org.apache.gearpump.streaming.examples.stock.main.Stock -proxy host:port
- ```
-
-3. Check the UI
- http://127.0.0.1:8080
-
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/resources/geardefault.conf
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/resources/geardefault.conf b/examples/streaming/stockcrawler/src/main/resources/geardefault.conf
deleted file mode 100644
index acee3bd..0000000
--- a/examples/streaming/stockcrawler/src/main/resources/geardefault.conf
+++ /dev/null
@@ -1,9 +0,0 @@
-gearpump {
- serializers {
- "org.apache.gearpump.streaming.examples.stock.StockPrice" = ""
- }
-}
-
-spray.can {
- server.parsing.max-content-length = "10M"
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/resources/stock/css/body.png
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/resources/stock/css/body.png b/examples/streaming/stockcrawler/src/main/resources/stock/css/body.png
deleted file mode 100644
index b5c536c..0000000
Binary files a/examples/streaming/stockcrawler/src/main/resources/stock/css/body.png and /dev/null differ
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/resources/stock/css/custom.css
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/resources/stock/css/custom.css b/examples/streaming/stockcrawler/src/main/resources/stock/css/custom.css
deleted file mode 100644
index 182d722..0000000
--- a/examples/streaming/stockcrawler/src/main/resources/stock/css/custom.css
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-.ui-datepicker {
- font-size: 11px;
-}
-
-.sidebar-label {
- font-size: 15px;
- font-family: calibri, Arial, Helvetica, sans-serif;
-}
-
-.help {
- font-size: 12px;
- font-family: calibri, Arial, Helvetica, sans-serif;
-}
-
-div.splitter {
- margin: 12px 0px 7px 0px;
- clear: both;
- border-top: 1px solid #EBEBEB;
-}
-
-input.sidebar {
- width: 165px
-}
-
-select.sidebar {
- width: 198px
-}
-
-table.dataintable {
- font-family: calibri, Arial, Helvetica, sans-serif;
- font-size: 15px;
- margin-top: 10px;
- border-collapse: collapse;
- border: 1px solid #888;
-}
-
-table.dataintable th {
- vertical-align: baseline;
- padding: 5px 15px 5px 5px;
- background-color: #EEE;
- border: 1px solid #888;
- text-align: left;
-}
-
-table.dataintable td {
- vertical-align: text-top;
- padding: 5px 15px 5px 5px;
- background-color: #FFFFFF;
- border: 1px solid #AAA;
-}
-
-#search {
- width: 100px;
- height: 25px;
- position: relative;
- left: 0px;
- top: 5px;
-}
-
-#mytable {
- width: 100%;
- height: 300;
- float: left;
-}
-
-#mychart {
- height: 250px;
- width: 100%;
-}
-
-#Menu {
- height: 100%;
- width: 245px;
- float: left;
-}
-
-#header {
- height: 115px;
- background-image: url(header.png);
-}
-
-#body {
- height: 100%;
- width: 100%;
- background-image: url(body.png);
- background-size: 100% 100%;
-}
-
-#footer {
- color: white;
- height: 70px;
- line-height: 70px;
- text-align: middle;
- clear: both;
- text-align: center;
- background-image: url(foot.png);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/resources/stock/css/foot.png
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/resources/stock/css/foot.png b/examples/streaming/stockcrawler/src/main/resources/stock/css/foot.png
deleted file mode 100644
index 5db91b5..0000000
Binary files a/examples/streaming/stockcrawler/src/main/resources/stock/css/foot.png and /dev/null differ
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/resources/stock/css/header.png
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/resources/stock/css/header.png b/examples/streaming/stockcrawler/src/main/resources/stock/css/header.png
deleted file mode 100644
index 9284e44..0000000
Binary files a/examples/streaming/stockcrawler/src/main/resources/stock/css/header.png and /dev/null differ
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/resources/stock/js/stock.js
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/resources/stock/js/stock.js b/examples/streaming/stockcrawler/src/main/resources/stock/js/stock.js
deleted file mode 100644
index 97f1e07..0000000
--- a/examples/streaming/stockcrawler/src/main/resources/stock/js/stock.js
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-function initChart(chartid, tableid, stockId) {
- require.config({
- paths: {
- echarts: 'http://echarts.baidu.com/build/dist'
- }
- });
-
- require(
- [
- 'echarts',
- 'echarts/chart/line'
- ],
-
- function (ec) {
- // \u57fa\u4e8e\u51c6\u5907\u597d\u7684dom\uff0c\u521d\u59cb\u5316echarts\u56fe\u8868
- var myChart = ec.init(document.getElementById(chartid));
- var dataPoints = 100;
- var timeTicket;
- clearInterval(timeTicket);
- timeTicket = setInterval(function () {
- $.getJSON("report/" + stockId, function (json) {
- STOCK_NAME = json.name
-
- var maxDrawnDown = json.currentMax[0].max.price - json.currentMax[0].min.price;
- var time = new Date(json.currentMax[0].current.timestamp).toLocaleTimeString().replace(/^\D*/, '');
- // \u52a8\u6001\u6570\u636e\u63a5\u53e3 addData
- myChart.addData([
- [
- 0, // \u7cfb\u5217\u7d22\u5f15
- maxDrawnDown.toFixed(2), // \u65b0\u589e\u6570\u636e
- false, // \u65b0\u589e\u6570\u636e\u662f\u5426\u4ece\u961f\u5217\u5934\u90e8\u63d2\u5165
- false, // \u662f\u5426\u589e\u52a0\u961f\u5217\u957f\u5ea6\uff0cfalse\u5219\u81ea\u5b9a\u5220\u9664\u539f\u6709\u6570\u636e\uff0c\u961f\u5934\u63d2\u5165\u5220\u961f\u5c3e\uff0c\u961f\u5c3e\u63d2\u5165\u5220\u961f\u5934
- time
- ],
- [
- 1, // \u7cfb\u5217\u7d22\u5f15
- json.currentMax[0].current.price.toFixed(2), // \u65b0\u589e\u6570\u636e
- false, // \u65b0\u589e\u6570\u636e\u662f\u5426\u4ece\u961f\u5217\u5934\u90e8\u63d2\u5165
- false, // \u662f\u5426\u589e\u52a0\u961f\u5217\u957f\u5ea6\uff0cfalse\u5219\u81ea\u5b9a\u5220\u9664\u539f\u6709\u6570\u636e\uff0c\u961f\u5934\u63d2\u5165\u5220\u961f\u5c3e\uff0c\u961f\u5c3e\u63d2\u5165\u5220\u961f\u5934
- time
- ]
- ]);
- document.getElementById(chartid).style.display = "block"
- document.getElementById(tableid).innerHTML = "<pre>" + JSON.stringify(json, null, 2) + "</pre>"
- });
- }, 2000);
-
- var subtext_ = "Draw Down"
-
- var option = {
- title: {
- text: 'Stock Analysis',
- subtext: "Max " + subtext_
- },
- tooltip: {
- trigger: 'axis'
- },
- legend: {
- data: ["Current Price", "Current Draw Down"]
- },
- toolbox: {
- show: false,
- feature: {
- mark: {show: true},
- dataView: {show: true, readOnly: false},
- magicType: {show: true, type: ['line', 'bar']},
- restore: {show: true},
- saveAsImage: {show: true}
- }
- },
- dataZoom: {
- show: false,
- start: 0,
- end: 100
- },
- xAxis: [
- {
- type: 'category',
- boundaryGap: true,
- data: (function () {
- var now = new Date();
- var res = [];
- var len = dataPoints;
- while (len--) {
- res.unshift(now.toLocaleTimeString().replace(/^\D*/, ''));
- now = new Date(now - 2000);
- }
- return res;
- })()
- }
- ],
- yAxis: [
- {
- type: 'value',
- scale: true,
- name: subtext_ + ' \u4ef7\u683c/\u5143',
- boundaryGap: [0, 0.3]
- },
- {
- type: 'value',
- scale: true,
- name: 'Current \u4ef7\u683c/\u5143',
- boundaryGap: [0, 0.1]
- }
- ],
- series: [
- {
- name: "Current Draw Down",
- type: 'line',
- data: (function () {
- var res = [];
- var len = dataPoints;
- while (len--) {
- res.push(0);
- }
- return res;
- })()
- },
- {
- name: "Current Price",
- type: 'line',
- yAxisIndex: 1,
- data: (function () {
- var res = [];
- var len = dataPoints;
- while (len--) {
- res.push(0);
- }
- return res;
- })()
- }
- ]
- };
-
- // \u4e3aecharts\u5bf9\u8c61\u52a0\u8f7d\u6570\u636e
- myChart.setOption(option);
- }
- );
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/resources/stock/stock.html
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/resources/stock/stock.html b/examples/streaming/stockcrawler/src/main/resources/stock/stock.html
deleted file mode 100644
index 9682a53..0000000
--- a/examples/streaming/stockcrawler/src/main/resources/stock/stock.html
+++ /dev/null
@@ -1,87 +0,0 @@
-<!DOCTYPE html>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<html>
-
-<head>
- <meta charset="utf-8">
- <link rel=stylesheet type=text/css href="css/custom.css">
- <script src="http://echarts.baidu.com/build/dist/echarts.js"></script>
- <script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js"></script>
- <script src="js/stock.js"></script>
- <script type="text/javascript">
- function search_onclick() {
- var stockId = document.getElementById('stockId').value
- initChart("mychart", "mytable", stockId)
- }
- </script>
-</head>
-
-<body style="background-color:#F2F2F2">
-<div id="container" style="width:882px; height:450px;margin-left:auto;margin-right:auto;">
- <div style="height:0px"></div>
- <div id="header">
- <div
- style="font-weight:600;position:relative;left:50px;top:50px;font-family: calibri, Arial, Helvetica, sans-serif;font-size:29px;color:white">
- Big Data Stock Analysis Demo
- </div>
- </div>
- <div id="body">
- <div id="Menu">
- <div style="position:relative;margin-left:30px; margin-right:20px;margin-top:20px;">
- <!-- form to post to accompany to get accompanying cars -->
-
- <table style="width:100%">
- <tr>
- <td class="sidebar-label">Stock Id:</td>
- </tr>
- <tr>
- <td class="sidebar-label">Example: sh600019, sz000002</td>
- </tr>
- <tr>
- <td style="vertical-align:top;">
- <input id="stockId" class="sidebar" type="text" name="stockId"/>
- </td>
- </tr>
- </table>
- <div class="splitter"></div>
- <div>
- <button id="search" onclick="search_onclick()">Search</button>
- </div>
- </div>
- </div>
- <div id="content"
- style="height:100%;width:585px;float:left;position:relative;left:20px;overflow:scroll;">
- <div
- style="height:50px;position:relative;top:15px;vertical-align:middle;font-weight:300;font-family: calibri, Arial, Helvetica, sans-serif;font-size:22px;color:black">
- Analysis Result:
- </div>
- <div style="height:7px;background-color:#92BDF2;"></div>
-
- <div id="mychart"></div>
-
- <div id="mytable"></div>
- </div>
- </div>
- <div id="footer">
- Big Data Team @ Intel
- </div>
-</div>
-</body>
-</html>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Analyzer.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Analyzer.scala b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Analyzer.scala
deleted file mode 100644
index f6fdff2..0000000
--- a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Analyzer.scala
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.examples.stock
-
-import scala.collection.immutable
-
-import akka.actor.Actor.Receive
-import org.joda.time.DateTime
-import org.joda.time.format.{DateTimeFormat, DateTimeFormatter}
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.examples.stock.Analyzer.HistoricalStates
-import org.apache.gearpump.streaming.examples.stock.Price._
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
-import org.apache.gearpump.util.LogUtil
-
-/**
- * Dradown analyzer
- * Definition: http://en.wikipedia.org/wiki/Drawdown_(economics)
- */
-class Analyzer(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
-
- val dateFormatter = DateTimeFormat forPattern "dd/MM/yyyy"
-
- private var stocksToReport = immutable.Set.empty[String]
- private var stockInfos = new immutable.HashMap[String, StockPrice]
-
- private var currentDownwardsStates = new immutable.HashMap[String, StockPriceState]
- private val historicalStates = new HistoricalStates()
- private var latestTimeStamp: Long = 0L
-
- override def onStart(startTime: StartTime): Unit = {
- LOG.info("analyzer is started")
- }
-
- override def onNext(msg: Message): Unit = {
- msg.msg match {
- case stock: StockPrice =>
- latestTimeStamp = stock.timestamp
- checkDate(stock)
- stockInfos += stock.stockId -> stock
- val downwardsState = updateCurrentStates(stock)
- val maxDrawdown = historicalStates.updatePresentMaximal(downwardsState)
- }
- }
-
- override def receiveUnManagedMessage: Receive = {
- case get@GetReport(stockId, date) =>
- var currentMax = currentDownwardsStates.get(stockId)
-
- val dateTime = Option(date) match {
- case Some(date) =>
- currentMax = None
- parseDate(dateFormatter, date)
- case None =>
- new DateTime(latestTimeStamp).withTimeAtStartOfDay
- }
-
- val historyMax = Option(dateTime).flatMap(handleHistoricalQuery(stockId, _))
- val name = stockInfos.get(stockId).map(_.name).getOrElse("")
- sender ! Report(stockId, name, dateTime.toString, historyMax, currentMax)
- }
-
- private def updateCurrentStates(stock: StockPrice) = {
- var downwardsState: StockPriceState = null
- if (currentDownwardsStates.contains(stock.stockId)) {
- downwardsState = generateNewState(stock, currentDownwardsStates.get(stock.stockId).get)
- } else {
- downwardsState = StockPriceState(stock.stockId, stock, stock, stock)
- }
- currentDownwardsStates += stock.stockId -> downwardsState
- downwardsState
- }
-
- // Update the stock's latest state.
- private def generateNewState(currentPrice: Price, oldState: StockPriceState): StockPriceState = {
- if (currentPrice.price > oldState.max.price) {
- StockPriceState(oldState.stockID, currentPrice, currentPrice, currentPrice)
- } else {
- val newState = StockPriceState(oldState.stockID, oldState.max,
- Price.min(currentPrice, oldState.min), currentPrice)
- newState
- }
- }
-
- private def checkDate(stock: StockPrice) = {
- if (currentDownwardsStates.contains(stock.stockId)) {
- val now = new DateTime(stock.timestamp)
- val lastTime = new DateTime(currentDownwardsStates.get(stock.stockId).get.current.timestamp)
- // New day
- if (now.getDayOfYear > lastTime.getDayOfYear || now.getYear > lastTime.getYear) {
- currentDownwardsStates -= stock.stockId
- }
- }
- }
-
- private def parseDate(format: DateTimeFormatter, input: String): DateTime = {
- format.parseDateTime(input)
- }
-
- private def handleHistoricalQuery(stockId: String, date: DateTime) = {
- val maximal = historicalStates.getHistoricalMaximal(stockId, date)
- maximal
- }
-}
-
-object Analyzer {
-
- class HistoricalStates {
- val LOG = LogUtil.getLogger(getClass)
- val dateFormatter = DateTimeFormat forPattern "dd/MM/yyyy"
- private var historicalMaxRaise = new immutable.HashMap[(String, DateTime), StockPriceState]
- private var historicalMaxDrawdown = new immutable.HashMap[(String, DateTime), StockPriceState]
-
- def updatePresentMaximal(newState: StockPriceState): Option[StockPriceState] = {
- val date = Analyzer.getDateFromTimeStamp(newState.current.timestamp)
- var newMaximalState: Option[StockPriceState] = null
- if (newState.max.price < Float.MinPositiveValue) {
- newMaximalState = generateNewMaximal(newState, date, historicalMaxRaise)
- if (newMaximalState.nonEmpty) {
- historicalMaxRaise += (newState.stockID, date) -> newMaximalState.get
- }
- } else {
- newMaximalState = generateNewMaximal(newState, date, historicalMaxDrawdown)
- if (newMaximalState.nonEmpty) {
- historicalMaxDrawdown += (newState.stockID, date) -> newMaximalState.get
- }
- }
- newMaximalState
- }
-
- def getHistoricalMaximal(stockId: String, date: DateTime): Option[StockPriceState] = {
- historicalMaxDrawdown.get((stockId, date))
- }
-
- private def generateNewMaximal(
- state: StockPriceState,
- date: DateTime,
- map: immutable.HashMap[(String, DateTime), StockPriceState])
- : Option[StockPriceState] = {
- val maximal = map.get((state.stockID, date))
- if (maximal.nonEmpty && maximal.get.drawDown > state.drawDown) {
- None
- } else {
- Some(state)
- }
- }
- }
-
- def getDateFromTimeStamp(timestamp: Long): DateTime = {
- new DateTime(timestamp).withTimeAtStartOfDay()
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Crawler.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Crawler.scala b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Crawler.scala
deleted file mode 100644
index bb444dd..0000000
--- a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Crawler.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.examples.stock
-
-import scala.concurrent.duration._
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
-
-class Crawler(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
-
- import taskContext._
-
- val FetchStockPrice = Message("FetchStockPrice")
-
- lazy val stocks = {
- val stockIds = conf.getValue[Array[String]]("StockId").get
- val size = if (stockIds.length % parallelism > 0) {
- stockIds.length / parallelism + 1
- } else {
- stockIds.length / parallelism
- }
-
- val start = taskId.index * size
- val end = (taskId.index + 1) * size
- stockIds.slice(start, end)
- }
-
- scheduleOnce(1.seconds)(self ! FetchStockPrice)
-
- val stockMarket = conf.getValue[StockMarket](classOf[StockMarket].getName).get
-
- override def onStart(startTime: StartTime): Unit = {
- // Nothing
- }
-
- override def onNext(msg: Message): Unit = {
- stockMarket.getPrice(stocks).foreach { price =>
- output(new Message(price, price.timestamp))
- }
- scheduleOnce(5.seconds)(self ! FetchStockPrice)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Data.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Data.scala b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Data.scala
deleted file mode 100644
index 94a85ff..0000000
--- a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Data.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.examples.stock
-
-// scalastyle:off equals.hash.code case class has equals defined
-case class StockPrice(
- stockId: String, name: String, price: String, delta: String, pecent: String, volume: String,
- money: String, timestamp: Long) {
- override def hashCode: Int = stockId.hashCode
-}
-// scalastyle:on equals.hash.code case class has equals defined
-
-case class Price(price: Float, timestamp: Long)
-
-object Price {
-
- import scala.language.implicitConversions
-
- implicit def StockPriceToPrice(stock: StockPrice): Price = {
- Price(stock.price.toFloat, stock.timestamp)
- }
-
- def min(first: Price, second: Price): Price = {
- if (first.price < second.price) {
- first
- } else {
- second
- }
- }
-}
-
-case class StockPriceState(stockID: String, max: Price, min: Price, current: Price) {
-
- def drawDownPeriod: Long = min.timestamp - max.timestamp
-
- def recoveryPeriod: Long = current.timestamp - min.timestamp
-
- def drawDown: Float = max.price - min.price
-}
-
-case class GetReport(stockId: String, date: String)
-
-case class Report(
- stockId: String, name: String, date: String, historyMax: Option[StockPriceState],
- currentMax: Option[StockPriceState])
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/QueryServer.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/QueryServer.scala b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/QueryServer.scala
deleted file mode 100644
index 01ccb3e..0000000
--- a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/QueryServer.scala
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.examples.stock
-
-import java.util.concurrent.TimeUnit
-import scala.concurrent.{ExecutionContext, Future}
-import scala.util.{Failure, Success}
-
-import akka.actor.Actor._
-import akka.actor.{Actor, ActorRefFactory, Props}
-import akka.io.IO
-import akka.pattern.ask
-import spray.can.Http
-import spray.http.StatusCodes
-import spray.json._
-import spray.routing.{HttpService, Route}
-import upickle.default.write
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterDataDetailRequest
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.ProcessorId
-import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef}
-import org.apache.gearpump.streaming.appmaster.{ProcessorSummary, StreamAppMasterSummary}
-import org.apache.gearpump.streaming.examples.stock.QueryServer.WebServer
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
-
-class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
- import scala.concurrent.ExecutionContext.Implicits.global
-
- import taskContext.{appId, appMaster}
-
- var analyzer: (ProcessorId, ProcessorSummary) = null
- implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
-
- override def onStart(startTime: StartTime): Unit = {
- appMaster ! AppMasterDataDetailRequest(appId)
- taskContext.actorOf(Props(new WebServer))
- }
-
- override def onNext(msg: Message): Unit = {
- // Skip
- }
-
- override def receiveUnManagedMessage: Receive = messageHandler
-
- def messageHandler: Receive = {
- case detail: StreamAppMasterSummary =>
- analyzer = detail.processors.find { kv =>
- val (processorId, processor) = kv
- processor.taskClass == classOf[Analyzer].getName
- }.get
- case getReport@GetReport(stockId, date) =>
- val parallism = analyzer._2.parallelism
- val processorId = analyzer._1
- val analyzerTaskId = TaskId(processorId, (stockId.hashCode & Integer.MAX_VALUE) % parallism)
- val requester = sender
- import scala.concurrent.Future
- (appMaster ? LookupTaskActorRef(analyzerTaskId))
- .asInstanceOf[Future[TaskActorRef]].flatMap { task =>
-
- (task.task ? getReport).asInstanceOf[Future[Report]]
- }.map { report =>
- LOG.info(s"reporting $report")
- requester ! report
- }
- case _ =>
- // Ignore
- }
-}
-
-object QueryServer {
- class WebServer extends Actor with HttpService {
-
- import context.dispatcher
- implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
- def actorRefFactory: ActorRefFactory = context
- implicit val system = context.system
-
- IO(Http) ! Http.Bind(self, interface = "localhost", port = 8080)
-
- override def receive: Receive = runRoute(webServer ~ staticRoute)
-
- def webServer: Route = {
- path("report" / Segment) { stockId =>
- get {
- onComplete((context.parent ? GetReport(stockId, null)).asInstanceOf[Future[Report]]) {
- case Success(report: Report) =>
- val json = write(report)
- complete(pretty(json))
- case Failure(ex) => complete(StatusCodes.InternalServerError,
- s"An error occurred: ${ex.getMessage}")
- }
- }
- }
- }
-
- val staticRoute = {
- pathEndOrSingleSlash {
- getFromResource("stock/stock.html")
- } ~
- pathPrefix("css") {
- get {
- getFromResourceDirectory("stock/css")
- }
- } ~
- pathPrefix("js") {
- get {
- getFromResourceDirectory("stock/js")
- }
- }
- }
-
- private def pretty(json: String): String = {
- json.parseJson.prettyPrint
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/StockMarket.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/StockMarket.scala b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/StockMarket.scala
deleted file mode 100644
index 24e050b..0000000
--- a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/StockMarket.scala
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.examples.stock
-
-import java.nio.charset.Charset
-import scala.io.Codec
-
-import org.apache.commons.httpclient.methods.GetMethod
-import org.apache.commons.httpclient.{HttpClient, MultiThreadedHttpConnectionManager}
-import org.htmlcleaner.{HtmlCleaner, TagNode}
-import org.joda.time.{DateTime, DateTimeZone}
-
-import org.apache.gearpump.streaming.examples.stock.StockMarket.ServiceHour
-import org.apache.gearpump.transport.HostPort
-import org.apache.gearpump.util.LogUtil
-
-class StockMarket(service: ServiceHour, proxy: HostPort = null) extends Serializable {
-
- private def LOG = LogUtil.getLogger(getClass)
-
- @transient
- private var connectionManager: MultiThreadedHttpConnectionManager = null
-
- private val eastMoneyStockPage = "http://quote.eastmoney.com/stocklist.html"
-
- private val stockPriceParser =
- """^var\shq_str_s_([a-z0-9A-Z]+)="([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+)";$""".r
-
- def shutdown(): Unit = {
- Option(connectionManager).map(_.shutdown())
- }
-
- @transient
- private var _client: HttpClient = null
-
- private def client: HttpClient = {
- _client = Option(_client).getOrElse {
- val connectionManager = new MultiThreadedHttpConnectionManager()
- val client = new HttpClient(connectionManager)
- Option(proxy).map(host => client.getHostConfiguration().setProxy(host.host, host.port))
- client
- }
- _client
- }
-
- def getPrice(stocks: Array[String]): Array[StockPrice] = {
-
- LOG.info(s"getPrice 1")
-
- val query = "http://hq.sinajs.cn/list=" + stocks.map("s_" + _).mkString(",")
- if (service.inService) {
-
- LOG.info(s"getPrice 2")
-
- val get = new GetMethod(query)
- client.executeMethod(get)
- val current = System.currentTimeMillis()
-
- val output = scala.io.Source.fromInputStream(get.getResponseBodyAsStream)(
- new Codec(Charset forName "GBK")).getLines().flatMap { line =>
- line match {
- case stockPriceParser(stockId, name, price, delta, pecent, volume, money) =>
- Some(StockPrice(stockId, name, price, delta, pecent, volume, money, current))
- case _ =>
- None
- }
- }.toArray
-
- LOG.info(s"getPrice 3 ${output.length}")
-
- output
- } else {
- Array.empty[StockPrice]
- }
- }
-
- private val urlPattern = """^.*/([a-zA-Z0-9]+)\.html$""".r
-
- def getStockIdList: Array[String] = {
- val cleaner = new HtmlCleaner
- val props = cleaner.getProperties
-
- val get = new GetMethod(eastMoneyStockPage)
- client.executeMethod(get)
-
- val root = cleaner.clean(get.getResponseBodyAsStream)
-
- val stockUrls = root.evaluateXPath("//div[@id='quotesearch']//li//a[@href]")
-
- val elements = root.getElementsByName("a", true)
-
- val hrefs = (0 until stockUrls.length)
- .map(stockUrls(_).asInstanceOf[TagNode].getAttributeByName("href"))
- .map { url =>
- url match {
- case urlPattern(code) => code
- case _ => null
- }
- }.toArray
- hrefs
- }
-}
-
-object StockMarket {
-
- class ServiceHour(all: Boolean) extends Serializable {
-
- /**
- * Morning openning: 9:30 am - 11:30 am
- */
- val morningStart = GMT8(new DateTime(0, 1, 1, 9, 30)).getMillis
- val morningEnd = GMT8(new DateTime(0, 1, 1, 11, 30)).getMillis
-
- /**
- * After noon openning: 13:00 pm - 15:00 pm
- */
- val afternoonStart = GMT8(new DateTime(0, 1, 1, 13, 0)).getMillis
- val afternoonEnd = GMT8(new DateTime(0, 1, 1, 15, 0)).getMillis
-
- def inService: Boolean = {
-
- if (all) {
- true
- } else {
- val now = GMT8(DateTime.now()).withDate(0, 1, 1).getMillis
- if (now >= morningStart && now <= morningEnd ||
- now >= afternoonStart && now <= afternoonEnd) {
- true
- } else {
- false
- }
- }
- }
-
- private def GMT8(time: DateTime): DateTime = {
- time.withZone(DateTimeZone.UTC).plusHours(8)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/main/Stock.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/main/Stock.scala b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/main/Stock.scala
deleted file mode 100644
index 6d17c20..0000000
--- a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/main/Stock.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.examples.stock.main
-
-import akka.actor.ActorSystem
-import org.slf4j.Logger
-
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
-import org.apache.gearpump.partitioner.HashPartitioner
-import org.apache.gearpump.streaming.examples.stock.StockMarket.ServiceHour
-import org.apache.gearpump.streaming.examples.stock.{Analyzer, Crawler, QueryServer, StockMarket}
-import org.apache.gearpump.streaming.{Processor, StreamApplication}
-import org.apache.gearpump.transport.HostPort
-import org.apache.gearpump.util.Graph.Node
-import org.apache.gearpump.util.{AkkaApp, Graph, LogUtil}
-
-/** Tracks the China's stock market index change */
-object Stock extends AkkaApp with ArgumentsParser {
-
- private val LOG: Logger = LogUtil.getLogger(getClass)
-
- override val options: Array[(String, CLIOption[Any])] = Array(
- "crawler" -> CLIOption[Int]("<how many fetcher to get data from remote>",
- required = false, defaultValue = Some(10)),
- "analyzer" -> CLIOption[Int]("<parallism of analyzer>",
- required = false, defaultValue = Some(1)),
- "proxy" -> CLIOption[String]("proxy setting host:port, for example: 127.0.0.1:8443",
- required = false, defaultValue = Some("")))
-
- def crawler(config: ParseResult)(implicit system: ActorSystem): StreamApplication = {
- val crawler = Processor[Crawler](config.getInt("crawler"))
- val analyzer = Processor[Analyzer](config.getInt("analyzer"))
- val queryServer = Processor[QueryServer](1)
-
- val proxySetting = config.getString("proxy")
- val proxy = if (proxySetting.isEmpty) {
- null
- } else HostPort(proxySetting)
- val stockMarket = new StockMarket(new ServiceHour(true), proxy)
- val stocks = stockMarket.getStockIdList
-
- // scalastyle:off println
- Console.println(s"Successfully fetched stock id for ${stocks.length} stocks")
- // scalastyle:on println
-
- val userConfig = UserConfig.empty.withValue("StockId", stocks)
- .withValue[StockMarket](classOf[StockMarket].getName, stockMarket)
- val partitioner = new HashPartitioner
-
- val p1 = crawler ~ partitioner ~> analyzer
- val p2 = Node(queryServer)
- val graph = Graph(p1, p2)
- val app = StreamApplication("stock_direct_analyzer", graph, userConfig
- )
- app
- }
-
- override def main(akkaConf: Config, args: Array[String]): Unit = {
- val config = parse(args)
- val context = ClientContext(akkaConf)
-
- implicit val system = context.system
-
- val app = crawler(config)
- val appId = context.submit(app)
- context.close()
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/README.md
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/README.md b/examples/streaming/transport/README.md
deleted file mode 100644
index fc9bdfe..0000000
--- a/examples/streaming/transport/README.md
+++ /dev/null
@@ -1,3 +0,0 @@
-What is this?
-=============
-A smart transportation example which simulate a city with millions of cars.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/geardefault.conf
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/geardefault.conf b/examples/streaming/transport/src/main/resources/geardefault.conf
deleted file mode 100644
index 0c8f421..0000000
--- a/examples/streaming/transport/src/main/resources/geardefault.conf
+++ /dev/null
@@ -1,12 +0,0 @@
-gearpump {
-
- serializers {
- ## Follow this format when adding new serializer for new message types
- ## "org.apache.gearpump.Message" = "org.apache.gearpump.streaming.MessageSerializer"
- "org.apache.gearpump.streaming.examples.transport.PassRecord" = ""
- }
-}
-
-spray.can {
- server.parsing.max-content-length = "10M"
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/transport/css/body.png
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/transport/css/body.png b/examples/streaming/transport/src/main/resources/transport/css/body.png
deleted file mode 100644
index b5c536c..0000000
Binary files a/examples/streaming/transport/src/main/resources/transport/css/body.png and /dev/null differ
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/transport/css/custom.css
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/transport/css/custom.css b/examples/streaming/transport/src/main/resources/transport/css/custom.css
deleted file mode 100644
index f324b6a..0000000
--- a/examples/streaming/transport/src/main/resources/transport/css/custom.css
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-.ui-datepicker {
- font-size: 11px;
-}
-
-.sidebar-label {
- font-size: 15px;
- font-family: calibri, Arial, Helvetica, sans-serif;
-}
-
-.help {
- font-size: 12px;
- font-family: calibri, Arial, Helvetica, sans-serif;
-}
-
-div.splitter {
- margin: 12px 0px 7px 0px;
- clear: both;
- border-top: 1px solid #EBEBEB;
-}
-
-input.sidebar {
- width: 165px
-}
-
-select.sidebar {
- width: 198px
-}
-
-table.dataintable {
- font-family: calibri, Arial, Helvetica, sans-serif;
- font-size: 15px;
- margin-top: 10px;
- border-collapse: collapse;
- border: 1px solid #888;
-}
-
-table.dataintable th {
- vertical-align: baseline;
- padding: 5px 15px 5px 5px;
- background-color: #EEE;
- border: 1px solid #888;
- text-align: left;
-}
-
-table.dataintable td {
- vertical-align: text-top;
- padding: 5px 15px 5px 5px;
- background-color: #FFFFFF;
- border: 1px solid #AAA;
-}
-
-#search {
- width: 100px;
- height: 25px;
- position: relative;
- left: 0px;
- top: 5px;
-}
-
-#mytable {
- width: 100%;
- height: 300;
- float: left;
-}
-
-#mychart {
- height: 400px;
- width: 100%;
-}
-
-#Menu {
- height: 100%;
- width: 245px;
- float: left;
-}
-
-#header {
- height: 115px;
- background-image: url(header.png);
-}
-
-#body {
- height: 100%;
- width: 100%;
- background-image: url(body.png);
- background-size: 100% 100%;
-}
-
-#footer {
- color: white;
- height: 70px;
- line-height: 70px;
- text-align: middle;
- clear: both;
- text-align: center;
- background-image: url(foot.png);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/transport/css/foot.png
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/transport/css/foot.png b/examples/streaming/transport/src/main/resources/transport/css/foot.png
deleted file mode 100644
index 5db91b5..0000000
Binary files a/examples/streaming/transport/src/main/resources/transport/css/foot.png and /dev/null differ
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/transport/css/header.png
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/transport/css/header.png b/examples/streaming/transport/src/main/resources/transport/css/header.png
deleted file mode 100644
index 9284e44..0000000
Binary files a/examples/streaming/transport/src/main/resources/transport/css/header.png and /dev/null differ